我正在尝试从服务器接收数据,该服务器使用服务器发送事件 (SSE) 每 5 秒向浏览器发送一次更新。我的目标是在我的 Flutter 应用程序中捕获该数据并在函数中生成它。
到目前为止我所做的:
我正在使用 flutter_client_sse 包来订阅 SSE 流。以下是我正在使用的代码:
Stream<dynamic> fetchRoutingStream() async* {
// Fetch the token asynchronously
SharedPrefsRepository sharedPrefsRepository =
SharedPrefsRepository(provider: SharedPrefsProvider());
String token = await sharedPrefsRepository.getAccessToken();
final url = 'https://example.com/api/v1/web/route/';
// Subscribe to SSE
SSEClient.subscribeToSSE(
method: SSERequestType.GET,
url: url,
header: {'Authorization': 'Bearer $token'}
).listen(
(event) {
// Log the event details
print('Id: ' + (event.id ?? ''));
print('Event: ' + (event.event ?? ''));
print('Data: ' + (event.data ?? ''));
},
);
}
问题:
问题是代码似乎永远不会到达
.listen()
部分。侦听器内的打印语句永远不会被执行。
我已经确认 API 每 5 秒发送一次数据(使用浏览器开发人员工具验证)。看来连接没有正确建立,或者我在 Flutter 中的 SSE 设置中遗漏了一些东西。
我的问题:
为什么监听器不工作并且无法打印从 SSE 收到的任何数据?
SSE 订阅可能存在什么问题,或者我应该如何调试?
补充说明:
我附上了一张屏幕截图,显示数据确实每 5 秒发送到 API 一次。
任何帮助或建议将不胜感激!
我最近也遇到了同样的问题,想和大家分享一下我的经验和解决方案。
这是使用任何浏览器(Chrome、Firefox、Edge 等)时都会出现的常见问题。
有趣的是,如果您在移动设备上测试相同的功能,它通常可以正常工作。
为了确保解决方案在 Web 和移动平台上无缝运行,您可以使用以下解决方法:
// sse_client.dart
import 'package:http/http.dart' as http;
Future<Stream<String>> getSSEStream(Uri url, Map<String, String> headers) async {
final request = http.Request('GET', url);
request.headers.addAll(headers);
return (await http.Client().send(request)).stream.transform(utf8.decoder);
}
// sse_client_web.dart
import 'package:fetch_client/fetch_client.dart';
import 'package:http/http.dart' as http;
Future<Stream<String>> getSSEStream(Uri url, Map<String, String> headers) async {
final fetchClient = FetchClient(mode: RequestMode.cors);
final request = http.Request('GET', url);
request.headers.addAll(headers);
final response = await fetchClient.send(request);
return response.stream.transform(utf8.decoder);
}
// 1. IMPORTANT!! The key part of all is this
import 'sse_client.dart' if (dart.library.js) 'sse_client_web.dart'; // Will be used on main.dart
// 2. Notice something important, im using mercure for this example in
static const String _hubUrl = 'https://mercure.your-application-baseurl/.well-known/mercure';
// main.dart Full test file
// ignore_for_file: avoid_print
import 'dart:async';
import 'dart:convert';
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
import 'package:flutter/material.dart';
import 'package:flutter_client_sse/flutter_client_sse.dart';
import 'sse_client.dart' if (dart.library.js) 'sse_client_web.dart';
void main() {
runApp(const MyApp());
}
class MyApp extends StatelessWidget {
const MyApp({super.key});
@override
Widget build(BuildContext context) {
return MaterialApp(
debugShowCheckedModeBanner: false,
title: 'Flutter Mercure SSE Demo',
theme: ThemeData(primarySwatch: Colors.blue),
home: const MercureSSEDemo(),
);
}
}
// Modelo para las notificaciones
class NotificationMessage {
final String message;
final DateTime timestamp;
NotificationMessage({
required this.message,
required this.timestamp,
});
factory NotificationMessage.fromSSE(SSEModel sseModel) {
print('Datos recibidos SSE: ${sseModel.data}');
try {
// Eliminar el último carácter de nueva línea si existe
final data = (sseModel.data ?? '').trimRight();
// Intentar decodificar el JSON si viene en ese formato
Map<String, dynamic>? jsonData;
try {
jsonData = json.decode(data);
print('JSON decodificado: $jsonData');
} catch (e) {
print('No es JSON válido, usando datos en bruto');
}
return NotificationMessage(
message: jsonData?['message'] ?? data,
timestamp: DateTime.now(),
);
} catch (e) {
print('Error procesando mensaje: $e');
return NotificationMessage(
message: 'Error al procesar mensaje: ${sseModel.data}',
timestamp: DateTime.now(),
);
}
}
}
// Service para manejar la lógica de Mercure
class MercureService {
static const String _hubUrl = 'https://mercure.your-application-baseurl/.well-known/mercure';
static const String _jwtSecret = '!ChangeThisMercureHubJWTSecretKey!';
static final List<String> _topics = [
'https://your-application-baseurl/notifications/all',
];
static String generateJWT() {
final jwt = JWT(
{
'mercure': {
'subscribe': ['*'],
'publish': []
}
},
header: {'alg': 'HS256'},
);
return jwt.sign(
SecretKey(_jwtSecret),
algorithm: JWTAlgorithm.HS256,
);
}
static Uri getSubscriptionUrl() {
return Uri.parse(_hubUrl).replace(
queryParameters: {'topic': _topics},
);
}
static Future<Stream<String>> subscribe() async {
final url = getSubscriptionUrl();
final headers = {
'Authorization': 'Bearer ${generateJWT()}',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
};
return getSSEStream(url, headers);
}
}
class MercureSSEDemo extends StatefulWidget {
const MercureSSEDemo({super.key});
@override
State<MercureSSEDemo> createState() => _MercureSSEDemoState();
}
class _MercureSSEDemoState extends State<MercureSSEDemo> {
late StreamSubscription<String> _sseSubscription;
final List<NotificationMessage> _notifications = [];
bool _isConnected = false;
final _reconnectInterval = const Duration(seconds: 5);
int _reconnectAttempts = 0;
static const int _maxReconnectAttempts = 5;
@override
void initState() {
super.initState();
_initSSE();
}
void _initSSE() async {
try {
final stream = await MercureService.subscribe();
_sseSubscription = stream.listen(
(data) {
print('Datos recibidos: $data');
// Procesar los datos...
final notification = NotificationMessage(
message: data,
timestamp: DateTime.now(),
);
setState(() {
_notifications.insert(0, notification);
_isConnected = true;
});
},
onError: _handleError,
onDone: _handleConnectionClosed,
);
setState(() => _isConnected = true);
_reconnectAttempts = 0;
} catch (e) {
_handleError(e);
}
}
void _handleError(dynamic error) {
print('002');
if (!mounted) return;
print('Error en SSE: $error');
setState(() => _isConnected = false);
_attemptReconnect();
}
void _handleConnectionClosed() {
print('003');
if (!mounted) return;
setState(() => _isConnected = false);
_attemptReconnect();
}
void _attemptReconnect() {
if (_reconnectAttempts >= _maxReconnectAttempts) {
print('Máximo número de intentos de reconexión alcanzado');
return;
}
Future.delayed(_reconnectInterval, () {
if (mounted && !_isConnected) {
_reconnectAttempts++;
print('Intento de reconexión $_reconnectAttempts de $_maxReconnectAttempts');
_initSSE();
}
});
}
@override
void dispose() {
_sseSubscription.cancel();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Mercure SSE Demo'),
actions: [
ConnectionStatusIndicator(isConnected: _isConnected),
],
),
body: NotificationsList(notifications: _notifications),
);
}
}
class ConnectionStatusIndicator extends StatelessWidget {
final bool isConnected;
const ConnectionStatusIndicator({
super.key,
required this.isConnected,
});
@override
Widget build(BuildContext context) {
return Container(
margin: const EdgeInsets.all(16.0),
width: 16,
height: 16,
decoration: BoxDecoration(
shape: BoxShape.circle,
color: isConnected ? Colors.green : Colors.red,
),
);
}
}
class NotificationsList extends StatelessWidget {
final List<NotificationMessage> notifications;
const NotificationsList({
super.key,
required this.notifications,
});
@override
Widget build(BuildContext context) {
return ListView.builder(
itemCount: notifications.length,
itemBuilder: (context, index) {
final notification = notifications[index];
return ListTile(
title: Text(notification.message),
subtitle: Text(
notification.timestamp.toLocal().toString(),
style: Theme.of(context).textTheme.bodySmall,
),
);
},
);
}
}
我希望这有帮助!如果您需要进一步说明或遇到任何其他问题,请告诉我!