便笺: Python 流式渲染 Markdown 实现,适用于 AI 终端聊天输出

公開日: 2026-04-25 14:37 更新日: 2026-04-25 14:37 914文字 5 min read

この投稿は「日本語」では表示できません。元の投稿を表示しています。
使用 Rich 库的 Live 组件实现 Markdown 流式渲染,智能识别代码块与段落,实时输出到终端。

前言

想了一晚上弄出来的。。。

使用 Rich 库的 Live 组件实现 Markdown 流式渲染,智能识别代码块与段落,实时输出到终端。

效果

效果展示
效果展示

代码

代码
import re
import time
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.text import Text
from typing import Optional

class SmartStreamingMarkdown:
    """
    智能流式Markdown渲染器。
    Live区域始终只显示纯文本缓冲区,完整段落会立即被渲染并输出到Live区域外。
    """
    def __init__(self, console: Optional[Console] = None, refresh_per_second: int = 4):
        self.console = console or Console()
        self.refresh_per_second = refresh_per_second
        
        # 核心缓冲区
        self._live_buffer_text = Text("")  # Live区域显示的纯文本
        self._renderable_md_buffer = ""    # 已累积的、可渲染的完整Markdown文本
        
        # 状态跟踪
        self._in_code_block = False
        self._code_block_start = re.compile(r'^```[a-zA-Z0-9_+#-]*\s*$', re.MULTILINE)
        self._code_block_end = re.compile(r'^```\s*$', re.MULTILINE)
        
        # 内部Live对象
        self._live = None
        # 新增:用于暂存已识别但未结束的代码块起始行
        self._pending_code_block_start_line = None

    def _process_and_flush_complete_blocks(self):
        """处理缓冲区,尝试提取并渲染已完成的段落/代码块。"""
        buffer_str = self._live_buffer_text.plain
        
        # 处理逻辑:查找可以切割的完整块
        # 1. 处理代码块
        if not self._in_code_block:
            # 检查是否有代码块开始
            start_match = self._code_block_start.search(buffer_str)
            if start_match:
                self._in_code_block = True
                # ***** 修复开始 *****
                # 立即从Live缓冲区中移除代码块起始行,并暂存
                cut_point = start_match.end()
                self._pending_code_block_start_line = buffer_str[:cut_point]
                self._live_buffer_text = Text(buffer_str[cut_point:])
                # 更新缓冲区字符串,用于本次函数的后续处理
                buffer_str = self._live_buffer_text.plain
                # ***** 修复结束 *****
        if self._in_code_block:
            # 在代码块内,寻找结束标记
            end_match = self._code_block_end.search(buffer_str)
            if end_match:
                # 找到结束位置
                cut_point = end_match.end()
                completed_code_body = buffer_str[:cut_point]
                # 从Live缓冲区移除已完成的部分
                self._live_buffer_text = Text(buffer_str[cut_point:])
                # 将暂存的开始行与代码块主体合并,然后渲染
                full_code_block = self._pending_code_block_start_line + completed_code_body
                self._renderable_md_buffer += full_code_block
                self.console.print(Markdown(full_code_block))
                # 同时打印个空行
                self.console.print()
                # 重置状态
                self._in_code_block = False
                self._pending_code_block_start_line = None
                # 递归处理剩余部分
                self._process_and_flush_complete_blocks()
                return
            else:
                # 代码块未结束,不做任何处理,保留在Live区域
                return
        
        # 2. 处理普通段落 (基于双换行符)
        para_end = buffer_str.find('\n\n')
        if para_end != -1:
            cut_point = para_end + 2  # 包含双换行符
            completed_block = buffer_str[:cut_point]
            # 从Live缓冲区移除并渲染
            self._live_buffer_text = Text(buffer_str[cut_point:])
            self._renderable_md_buffer += completed_block
            self.console.print(Markdown(completed_block))
            # 同时打印个空行
            self.console.print()
            # 递归处理
            self._process_and_flush_complete_blocks()
            return
        # 若无完整块,则保持原样在Live区域显示

    def start(self):
        """启动Live显示上下文。"""
        self._live = Live(self._live_buffer_text, console=self.console, refresh_per_second=self.refresh_per_second, auto_refresh=True)
        self._live.start()

    def append(self, text_fragment: str):
        """接收新的文本片段,更新缓冲区并尝试渲染。"""
        if self._live is None:
            self.start()
        
        # 1. 将新文本追加到Live缓冲区
        self._live_buffer_text.plain += text_fragment
        
        # 2. 尝试处理并刷新任何已完成的块
        self._process_and_flush_complete_blocks()
        
        # 3. 更新Live显示(此时只显示未处理的缓冲文本)
        self._live.update(self._live_buffer_text)

    def finish(self):
        """结束流式处理,渲染所有剩余内容。"""
        if self._live_buffer_text.plain:
            # 将Live缓冲区所有剩余内容作为最后一块渲染
            final_md = self._live_buffer_text.plain
            if final_md.strip():
                self.console.print(Markdown(final_md))
                # 同时打印个空行
                self.console.print()
            self._live_buffer_text = Text("")
            self._live.update(self._live_buffer_text)
        if self._live:
            self._live.stop()

# 使用示例
if __name__ == "__main__":
    console = Console()
    streamer = SmartStreamingMarkdown(console=console, refresh_per_second=12)

    # 模拟一个流式数据源
    def simulated_stream():
        message_parts = [
            "# 这是一个标题\n\n",
            "这是一个段落,它会在遇到双换行符后立即被渲染。",
            "而这段还在缓冲。\n\n",
            "现在上面的段落被渲染了,这是新段落。\n\n",
            "```python\n# 进入代码块\nimport sys\n\n",
            "def hello():\n    print('world')\n",
            "```\n\n",  # 代码块结束标记
            "代码块已结束并渲染,这是后续文本。",
            "\nbhn\n``","`pyth","on\n# 进入代码块\nimpor","t sys\n\n",
            "def he","llo():\n    print('world')\n",
            "`","``\n\n",  # 代码块结束标记
        ]
        for part in message_parts:
            yield part
            time.sleep(0.1)  # 模拟网络延迟

    try:
        for chunk in simulated_stream():
            streamer.append(chunk)
    finally:
        streamer.finish()
    console.print("\n[bold green]流式渲染完成![/bold green]")

気に入ったならばコメントを残してくださいね~