completeStream method
Sends a completion request and returns a stream of chunks.
Implementation
@override
Stream<AIStreamChunk> completeStream(
List<AIMessage> messages, {
int? maxTokens,
double? temperature,
List<AITool>? tools,
}) async* {
final body =
_buildRequestBody(messages, maxTokens, temperature, stream: true);
final request = http.Request('POST', Uri.parse('$_baseUrl/messages'));
request.headers.addAll(_headers());
request.body = jsonEncode(body);
final streamedResponse = await _httpClient.send(request);
if (streamedResponse.statusCode != 200) {
final responseBody = await streamedResponse.stream.bytesToString();
final json = jsonDecode(responseBody) as Map<String, dynamic>;
throw _parseError(streamedResponse.statusCode, json);
}
String currentEvent = '';
await for (final line in streamedResponse.stream
.transform(utf8.decoder)
.transform(const LineSplitter())) {
if (line.startsWith('event: ')) {
currentEvent = line.substring(7);
continue;
}
if (!line.startsWith('data: ')) continue;
final jsonStr = line.substring(6);
try {
final json = jsonDecode(jsonStr) as Map<String, dynamic>;
switch (currentEvent) {
case 'content_block_delta':
final delta = json['delta'] as Map<String, dynamic>?;
if (delta?['type'] == 'text_delta') {
final text = delta?['text'] as String? ?? '';
if (text.isNotEmpty) {
yield AIStreamChunk(
text: text,
provider: name,
model: config.model,
);
}
}
break;
case 'message_delta':
final usage = json['usage'] as Map<String, dynamic>?;
yield AIStreamChunk(
text: '',
isComplete: true,
finishReason: json['delta']?['stop_reason'] as String?,
usage: usage != null
? AIUsage(
promptTokens: 0,
completionTokens: usage['output_tokens'] as int? ?? 0,
)
: null,
provider: name,
model: config.model,
);
break;
}
} on FormatException catch (_) {
// Skip malformed JSON chunks
}
}
}