简介
- 流式响应(Streaming Response)是一种数据传输方式,通常用于网络通信和数据处理中。在这种模式下,数据不是一次性发送或接收,而是以连续的流的形式进行传输。
开发环境
- 开发工具:IntelliJ IDEA
- JDK:OpenJDK 17.0.7
代码
- 代码中流式返回每次响应一个字符,实际可以返回其他格式,比如返回
JSON
格式字符串,可以更灵活的返回数据 - 接口代码
package com.hyx.ssl.modules.cert.controller;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import java.io.PrintWriter;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@RestController
@RequestMapping
public class CertInfoController {
private Set<String> chatIds = new HashSet<>();
/**
* 对话接口
*/
@PostMapping("/v1/chat/completions")
public ResponseEntity<StreamingResponseBody> chatCompletions(@RequestBody Map<String, Object> body) {
//获取参数
String chatId = (String) body.get("chatId");
//记录对话id,用于判断是否打断响应
if (chatId != null) {
chatIds.add(chatId);
}
String msg = "流式响应(Streaming Response)是一种数据传输方式,通常用于网络通信和数据处理中。在这种模式下,数据不是一次性发送或接收,而是以连续的流的形式进行传输。";
//流式响应数据
StreamingResponseBody responseBody = outputStream -> {
//循环msg每个字
for (int i = 0; i < msg.length(); i++) {
//停止对话,则退出
if (chatId != null && !chatIds.contains(chatId)) {
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
}
String text = String.valueOf(msg.charAt(i));
PrintWriter writer = new PrintWriter(outputStream, true);
writer.println(text);
// 刷新输出流,确保数据被发送
writer.flush();
}
//删除记录对话id
chatIds.remove(chatId);
};
return ResponseEntity.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(responseBody);
}
/**
* 停止对话接口
*/
@GetMapping("/v1/chat/stop/{chatId}")
public void chatStop(@PathVariable String chatId) {
if (chatId == null) {
return;
}
chatIds.remove(chatId);
}
}
- Html页面代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>流式响应demo</title>
</head>
<body>
<button id="startBtn" onclick="start()">开始请求</button>
<button onclick="stop('test')">停止请求</button>
<p id="text-all"></p>
</body>
</html>
<script>
//用于存储当前的AbortController
let currentController = null;
// 调用POST接口的函数
async function start() {
//禁用按钮
document.getElementById("startBtn").disabled = true;
//取消上一个请求
if (currentController) {
console.log("取消请求");
currentController.abort();
}
// 创建新的AbortController实例
currentController = new AbortController();
//清空内容
document.getElementById("text-all").innerHTML = "";
// 使用fetch发送POST请求
const response = await fetch('http://localhost:8080/v1/chat/completions', {
method: 'POST', // 请求方法
headers: {
'Content-Type': 'application/json', // 设置请求头
},
body: JSON.stringify({"chatId": "test"}), // 请求体
signal: currentController.signal
});
// 获取流式响应数据
const reader = response.body.getReader();
// 读取流数据
while (true) {
const {done, value} = await reader.read();
if (done) {
break;
}
// 将流数据转换为字符串
const text = new TextDecoder().decode(value);
document.getElementById("text-all").innerHTML += text.trim();
}
document.getElementById("startBtn").disabled = false;
}
//停止
function stop(chatId) {
// 使用fetch发送GET请求
fetch('http://localhost:8080/v1/chat/stop/' + chatId, {method: 'GET'});
}
</script>