ollama中Phi-4-mini-reasoning的流式输出实现:SSE接口调用完整示例

本文详细讲解如何在ollama中通过SSE接口实现Phi-4-mini-reasoning模型的流式输出,包含完整代码示例和实际应用场景。

1. 什么是流式输出和SSE接口

流式输出是一种让AI模型逐步返回生成结果的技术,而不是等待整个回答生成完毕后再一次性返回。这种方式特别适合需要实时显示生成过程的场景。

SSE(Server-Sent Events)是一种服务器向客户端推送数据的技术标准,它基于HTTP协议,允许服务器主动向客户端发送数据更新。相比传统的轮询方式,SSE更加高效和实时。

在ollama中使用SSE接口调用Phi-4-mini-reasoning模型,可以让你:

  • 实时看到模型思考过程
  • 减少用户等待焦虑感
  • 在生成过程中随时中断
  • 获得更好的交互体验

2. 环境准备与基础配置

2.1 确保ollama正常运行

首先确认你的ollama服务已经启动并正常运行:

# 检查ollama服务状态
ollama serve

# 确认Phi-4-mini-reasoning模型已下载
ollama list

如果还没有下载Phi-4-mini-reasoning模型,可以通过以下命令下载:

# 拉取最新版本的Phi-4-mini-reasoning模型
ollama pull phi-4-mini-reasoning

2.2 测试基础API调用

在开始流式输出前,先测试一下基础的非流式API调用是否正常:

curl http://localhost:11434/api/generate -d '{
  "model": "phi-4-mini-reasoning",
  "prompt": "你好,请介绍一下你自己",
  "stream": false
}'

如果这个命令能正常返回结果,说明ollama服务和模型都配置正确。

3. SSE流式输出完整实现

3.1 基础SSE接口调用

下面是一个最简单的SSE流式输出示例,使用JavaScript实现:

async function streamPhi4Response(prompt) {
  const response = await fetch('http://localhost:11434/api/generate', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'phi-4-mini-reasoning',
      prompt: prompt,
      stream: true  // 关键参数:启用流式输出
    })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const chunk = decoder.decode(value);
    const lines = chunk.split('\n').filter(line => line.trim());
    
    for (const line of lines) {
      try {
        const data = JSON.parse(line.replace('data: ', ''));
        if (data.response) {
          process.stdout.write(data.response);
        }
        if (data.done) {
          process.stdout.write('\n');
        }
      } catch (e) {
        // 忽略解析错误
      }
    }
  }
}

// 使用示例
streamPhi4Response('请用中文解释什么是机器学习');

3.2 Python实现示例

如果你更喜欢使用Python,这里是一个完整的Python实现:

import requests
import json

def stream_phi4_response(prompt):
    url = "http://localhost:11434/api/generate"
    payload = {
        "model": "phi-4-mini-reasoning",
        "prompt": prompt,
        "stream": True
    }
    
    response = requests.post(url, json=payload, stream=True)
    
    for line in response.iter_lines():
        if line:
            try:
                # 解码并移除"data: "前缀
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    json_data = json.loads(decoded_line[6:])
                    
                    if 'response' in json_data:
                        print(json_data['response'], end='', flush=True)
                    
                    if json_data.get('done', False):
                        print()  # 换行
                        
            except json.JSONDecodeError:
                continue

# 使用示例
if __name__ == "__main__":
    stream_phi4_response("请写一个关于人工智能的短故事")

3.3 带错误处理的完整实现

在实际应用中,我们需要添加完善的错误处理机制:

class Phi4StreamClient {
  constructor(baseURL = 'http://localhost:11434') {
    this.baseURL = baseURL;
  }

  async streamGenerate(prompt, options = {}) {
    const {
      onMessage = (data) => console.log(data.response),
      onError = (error) => console.error('Error:', error),
      onComplete = () => console.log('\nStream completed'),
      temperature = 0.7,
      max_tokens = 2048
    } = options;

    try {
      const response = await fetch(`${this.baseURL}/api/generate`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          model: 'phi-4-mini-reasoning',
          prompt: prompt,
          stream: true,
          options: {
            temperature: temperature,
            num_predict: max_tokens
          }
        })
      });

      if (!response.ok) {
        throw new Error(`HTTP error! status: ${response.status}`);
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      
      while (true) {
        const { done, value } = await reader.read();
        if (done) {
          onComplete();
          break;
        }
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\n').filter(line => line.trim());
        
        for (const line of lines) {
          try {
            if (line.startsWith('data: ')) {
              const data = JSON.parse(line.slice(6));
              onMessage(data);
            }
          } catch (e) {
            onError(new Error(`Failed to parse line: ${line}`));
          }
        }
      }
      
    } catch (error) {
      onError(error);
    }
  }
}

// 使用示例
const client = new Phi4StreamClient();

client.streamGenerate('解释一下深度学习的基本概念', {
  onMessage: (data) => {
    process.stdout.write(data.response || '');
  },
  onComplete: () => {
    console.log('\n\n生成完成!');
  },
  onError: (error) => {
    console.error('发生错误:', error.message);
  }
});

4. 高级功能与实用技巧

4.1 实时进度显示

你可以在流式输出时显示生成进度:

let totalTokens = 0;
let receivedTokens = 0;

client.streamGenerate('长文本生成测试', {
  onMessage: (data) => {
    if (data.response) {
      process.stdout.write(data.response);
      receivedTokens++;
    }
    
    if (typeof data.total_duration !== 'undefined') {
      totalTokens = data.eval_count;
    }
    
    // 显示进度
    if (totalTokens > 0) {
      const progress = ((receivedTokens / totalTokens) * 100).toFixed(1);
      process.stdout.write(` [${progress}%]`);
    }
  }
});

4.2 中断生成功能

流式输出的一个重要优势是可以随时中断生成过程:

let controller = null;

function startGeneration(prompt) {
  controller = new AbortController();
  
  fetch('http://localhost:11434/api/generate', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'phi-4-mini-reasoning',
      prompt: prompt,
      stream: true
    }),
    signal: controller.signal
  })
  .then(response => {
    // 处理流式响应
  })
  .catch(error => {
    if (error.name === 'AbortError') {
      console.log('生成已中断');
    }
  });
}

// 中断生成
function stopGeneration() {
  if (controller) {
    controller.abort();
    controller = null;
  }
}

4.3 上下文保持对话

对于多轮对话,可以使用上下文保持功能:

let conversationContext = [];

async function chatWithPhi4(message) {
  conversationContext.push({ role: 'user', content: message });
  
  const response = await fetch('http://localhost:11434/api/chat', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'phi-4-mini-reasoning',
      messages: conversationContext,
      stream: true
    })
  });
  
  let fullResponse = '';
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const chunk = decoder.decode(value);
    const lines = chunk.split('\n').filter(line => line.trim());
    
    for (const line of lines) {
      try {
        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.slice(6));
          if (data.message && data.message.content) {
            process.stdout.write(data.message.content);
            fullResponse += data.message.content;
          }
        }
      } catch (e) {
        // 忽略解析错误
      }
    }
  }
  
  conversationContext.push({ role: 'assistant', content: fullResponse });
  return fullResponse;
}

5. 常见问题与解决方案

5.1 连接超时问题

如果遇到连接超时,可以调整超时设置:

// 增加超时时间
const response = await fetch('http://localhost:11434/api/generate', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    model: 'phi-4-mini-reasoning',
    prompt: prompt,
    stream: true
  }),
  timeout: 30000  // 30秒超时
});

5.2 内存占用优化

对于长时间运行的流式请求,需要注意内存管理:

function createMemoryEfficientStream() {
  let buffer = '';
  
  return new TransformStream({
    transform(chunk, controller) {
      buffer += chunk;
      const lines = buffer.split('\n');
      
      // 保留最后的不完整行
      buffer = lines.pop() || '';
      
      for (const line of lines) {
        if (line.trim()) {
          controller.enqueue(line);
        }
      }
    },
    
    flush(controller) {
      if (buffer.trim()) {
        controller.enqueue(buffer);
      }
    }
  });
}

5.3 网络中断重连

实现自动重连机制:

async function robustStreamRequest(prompt, maxRetries = 3) {
  let retries = 0;
  
  while (retries < maxRetries) {
    try {
      const response = await fetch('http://localhost:11434/api/generate', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          model: 'phi-4-mini-reasoning',
          prompt: prompt,
          stream: true
        })
      });
      
      return response;
      
    } catch (error) {
      retries++;
      if (retries >= maxRetries) {
        throw error;
      }
      
      console.log(`连接失败,第${retries}次重试...`);
      await new Promise(resolve => setTimeout(resolve, 1000 * retries));
    }
  }
}

6. 实际应用场景

6.1 实时聊天应用

将SSE流式输出集成到聊天应用中:

// 前端代码示例
const chatForm = document.getElementById('chat-form');
const messageInput = document.getElementById('message-input');
const chatMessages = document.getElementById('chat-messages');

chatForm.addEventListener('submit', async (e) => {
  e.preventDefault();
  
  const message = messageInput.value.trim();
  if (!message) return;
  
  // 添加用户消息到聊天界面
  addMessage('user', message);
  messageInput.value = '';
  
  // 创建AI消息元素
  const aiMessageElement = addMessage('ai', '');
  
  try {
    const response = await fetch('/api/chat', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ message })
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let aiResponse = '';
    
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      const chunk = decoder.decode(value);
      const lines = chunk.split('\n').filter(line => line.trim());
      
      for (const line of lines) {
        try {
          const data = JSON.parse(line.replace('data: ', ''));
          if (data.response) {
            aiResponse += data.response;
            aiMessageElement.textContent = aiResponse;
          }
        } catch (e) {
          // 忽略解析错误
        }
      }
    }
    
  } catch (error) {
    aiMessageElement.textContent = '抱歉,发生错误:' + error.message;
  }
});

function addMessage(role, content) {
  const messageDiv = document.createElement('div');
  messageDiv.className = `message ${role}`;
  messageDiv.textContent = content;
  chatMessages.appendChild(messageDiv);
  chatMessages.scrollTop = chatMessages.scrollHeight;
  return messageDiv;
}

6.2 代码生成与实时预览

对于代码生成场景,流式输出特别有用:

async function generateCodeWithLivePreview(requirement) {
  console.log('开始生成代码...\n');
  
  let generatedCode = '';
  
  await client.streamGenerate(`根据以下需求生成代码:${requirement}`, {
    onMessage: (data) => {
      if (data.response) {
        process.stdout.write(data.response);
        generatedCode += data.response;
        
        // 实时语法高亮预览(简化示例)
        if (generatedCode.includes('\n')) {
          console.log('\n--- 实时预览 ---');
          console.log(generatedCode);
          console.log('---------------\n');
        }
      }
    },
    onComplete: () => {
      console.log('\n\n代码生成完成!');
      console.log('最终代码:');
      console.log(generatedCode);
    }
  });
  
  return generatedCode;
}

7. 总结

通过SSE接口实现Phi-4-mini-reasoning的流式输出,能够显著提升用户体验,让AI交互更加自然和实时。本文提供的完整示例涵盖了从基础调用到高级应用的各个方面,你可以根据实际需求进行调整和扩展。

关键要点总结:

  1. 流式输出优势:实时显示生成过程,减少等待时间,支持中断操作
  2. SSE接口使用:通过设置stream: true参数启用流式输出
  3. 错误处理:完善的错误处理机制确保应用稳定性
  4. 性能优化:内存管理和网络优化提升用户体验
  5. 实际应用:适用于聊天应用、代码生成、内容创作等多种场景

记得在实际部署时考虑网络环境、错误处理和用户体验的平衡,这样才能打造出真正好用的AI应用。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐