Flutter 中未从服务器向客户端发送 SSE 接收数据

问题描述 投票:0回答:1

我正在尝试从服务器接收数据,该服务器使用服务器发送事件 (SSE) 每 5 秒向浏览器发送一次更新。我的目标是在我的 Flutter 应用程序中捕获该数据并在函数中生成它。

到目前为止我所做的:

我正在使用 flutter_client_sse 包来订阅 SSE 流。以下是我正在使用的代码:

Stream<dynamic> fetchRoutingStream() async* {
    // Fetch the token asynchronously
    SharedPrefsRepository sharedPrefsRepository =
        SharedPrefsRepository(provider: SharedPrefsProvider());
    String token = await sharedPrefsRepository.getAccessToken();

    final url = 'https://example.com/api/v1/web/route/';

    // Subscribe to SSE
    SSEClient.subscribeToSSE(
        method: SSERequestType.GET,
        url: url,
        header: {'Authorization': 'Bearer $token'}
    ).listen(
        (event) {
            // Log the event details
            print('Id: ' + (event.id ?? ''));
            print('Event: ' + (event.event ?? ''));
            print('Data: ' + (event.data ?? ''));
        },
    );
}

问题:

问题是代码似乎永远不会到达

.listen()
部分。侦听器内的打印语句永远不会被执行。

我已经确认 API 每 5 秒发送一次数据(使用浏览器开发人员工具验证)。看来连接没有正确建立,或者我在 Flutter 中的 SSE 设置中遗漏了一些东西。

我的问题:

为什么监听器不工作并且无法打印从 SSE 收到的任何数据?

SSE 订阅可能存在什么问题,或者我应该如何调试?


screenshot

补充说明:

我附上了一张屏幕截图,显示数据确实每 5 秒发送到 API 一次。

任何帮助或建议将不胜感激!

flutter dart server-sent-events
1个回答
0
投票

我最近也遇到了同样的问题,想和大家分享一下我的经验和解决方案。

这是使用任何浏览器(Chrome、Firefox、Edge 等)时都会出现的常见问题。

有趣的是,如果您在移动设备上测试相同的功能,它通常可以正常工作。

为了确保解决方案在 Web 和移动平台上无缝运行,您可以使用以下解决方法:

// sse_client.dart
import 'package:http/http.dart' as http;

Future<Stream<String>> getSSEStream(Uri url, Map<String, String> headers) async {
  final request = http.Request('GET', url);
  request.headers.addAll(headers);
  return (await http.Client().send(request)).stream.transform(utf8.decoder);
}

// sse_client_web.dart
import 'package:fetch_client/fetch_client.dart';
import 'package:http/http.dart' as http;

Future<Stream<String>> getSSEStream(Uri url, Map<String, String> headers) async {
  final fetchClient = FetchClient(mode: RequestMode.cors);
  final request = http.Request('GET', url);
  request.headers.addAll(headers);
  final response = await fetchClient.send(request);
  return response.stream.transform(utf8.decoder);
}


// 1. IMPORTANT!! The key part of all is this
import 'sse_client.dart' if (dart.library.js) 'sse_client_web.dart'; // Will be used on main.dart

// 2. Notice something important, im using mercure for this example in
static const String _hubUrl = 'https://mercure.your-application-baseurl/.well-known/mercure';

// main.dart Full test file
// ignore_for_file: avoid_print
import 'dart:async';
import 'dart:convert';
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
import 'package:flutter/material.dart';
import 'package:flutter_client_sse/flutter_client_sse.dart';
import 'sse_client.dart' if (dart.library.js) 'sse_client_web.dart';

void main() {
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({super.key});

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      debugShowCheckedModeBanner: false,
      title: 'Flutter Mercure SSE Demo',
      theme: ThemeData(primarySwatch: Colors.blue),
      home: const MercureSSEDemo(),
    );
  }
}

// Modelo para las notificaciones
class NotificationMessage {
  final String message;
  final DateTime timestamp;

  NotificationMessage({
    required this.message,
    required this.timestamp,
  });

  factory NotificationMessage.fromSSE(SSEModel sseModel) {
    print('Datos recibidos SSE: ${sseModel.data}');
    
    try {
      // Eliminar el último carácter de nueva línea si existe
      final data = (sseModel.data ?? '').trimRight();
      
      // Intentar decodificar el JSON si viene en ese formato
      Map<String, dynamic>? jsonData;
      try {
        jsonData = json.decode(data);
        print('JSON decodificado: $jsonData');
      } catch (e) {
        print('No es JSON válido, usando datos en bruto');
      }

      return NotificationMessage(
        message: jsonData?['message'] ?? data,
        timestamp: DateTime.now(),
      );
    } catch (e) {
      print('Error procesando mensaje: $e');
      return NotificationMessage(
        message: 'Error al procesar mensaje: ${sseModel.data}',
        timestamp: DateTime.now(),
      );
    }
  }
}

// Service para manejar la lógica de Mercure
class MercureService {
  static const String _hubUrl = 'https://mercure.your-application-baseurl/.well-known/mercure';
  static const String _jwtSecret = '!ChangeThisMercureHubJWTSecretKey!';
  
  static final List<String> _topics = [
    'https://your-application-baseurl/notifications/all',
  ];

  static String generateJWT() {
    final jwt = JWT(
      {
        'mercure': {
          'subscribe': ['*'],
          'publish': []
        }
      },
      header: {'alg': 'HS256'},
    );

    return jwt.sign(
      SecretKey(_jwtSecret),
      algorithm: JWTAlgorithm.HS256,
    );
  }

  static Uri getSubscriptionUrl() {
    return Uri.parse(_hubUrl).replace(
      queryParameters: {'topic': _topics},
    );
  }

  static Future<Stream<String>> subscribe() async {
    final url = getSubscriptionUrl();
    final headers = {
      'Authorization': 'Bearer ${generateJWT()}',
      'Accept': 'text/event-stream',
      'Cache-Control': 'no-cache',
    };

    return getSSEStream(url, headers);
  }
}

class MercureSSEDemo extends StatefulWidget {
  const MercureSSEDemo({super.key});

  @override
  State<MercureSSEDemo> createState() => _MercureSSEDemoState();
}

class _MercureSSEDemoState extends State<MercureSSEDemo> {
  late StreamSubscription<String> _sseSubscription;
  final List<NotificationMessage> _notifications = [];
  bool _isConnected = false;
  final _reconnectInterval = const Duration(seconds: 5);
  int _reconnectAttempts = 0;
  static const int _maxReconnectAttempts = 5;

  @override
  void initState() {
    super.initState();
    _initSSE();
  }

  void _initSSE() async {
    try {
      final stream = await MercureService.subscribe();
      _sseSubscription = stream.listen(
        (data) {
          print('Datos recibidos: $data');
          // Procesar los datos...
          final notification = NotificationMessage(
            message: data,
            timestamp: DateTime.now(),
          );
          setState(() {
            _notifications.insert(0, notification);
            _isConnected = true;
          });
        },
        onError: _handleError,
        onDone: _handleConnectionClosed,
      );
      setState(() => _isConnected = true);
      _reconnectAttempts = 0;
    } catch (e) {
      _handleError(e);
    }
  }

  void _handleError(dynamic error) {
    print('002');

    if (!mounted) return;

    print('Error en SSE: $error');
    setState(() => _isConnected = false);
    _attemptReconnect();
  }

  void _handleConnectionClosed() {
    print('003');

    if (!mounted) return;

    setState(() => _isConnected = false);
    _attemptReconnect();
  }

  void _attemptReconnect() {
    if (_reconnectAttempts >= _maxReconnectAttempts) {
      print('Máximo número de intentos de reconexión alcanzado');
      return;
    }

    Future.delayed(_reconnectInterval, () {
      if (mounted && !_isConnected) {
        _reconnectAttempts++;
        print('Intento de reconexión $_reconnectAttempts de $_maxReconnectAttempts');
        _initSSE();
      }
    });
  }

  @override
  void dispose() {
    _sseSubscription.cancel();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text('Mercure SSE Demo'),
        actions: [
          ConnectionStatusIndicator(isConnected: _isConnected),
        ],
      ),
      body: NotificationsList(notifications: _notifications),
    );
  }
}

class ConnectionStatusIndicator extends StatelessWidget {
  final bool isConnected;

  const ConnectionStatusIndicator({
    super.key,
    required this.isConnected,
  });

  @override
  Widget build(BuildContext context) {
    return Container(
      margin: const EdgeInsets.all(16.0),
      width: 16,
      height: 16,
      decoration: BoxDecoration(
        shape: BoxShape.circle,
        color: isConnected ? Colors.green : Colors.red,
      ),
    );
  }
}

class NotificationsList extends StatelessWidget {
  final List<NotificationMessage> notifications;

  const NotificationsList({
    super.key,
    required this.notifications,
  });

  @override
  Widget build(BuildContext context) {
    return ListView.builder(
      itemCount: notifications.length,
      itemBuilder: (context, index) {
        final notification = notifications[index];
        return ListTile(
          title: Text(notification.message),
          subtitle: Text(
            notification.timestamp.toLocal().toString(),
            style: Theme.of(context).textTheme.bodySmall,
          ),
        );
      },
    );
  }
}

我希望这有帮助!如果您需要进一步说明或遇到任何其他问题,请告诉我!

© www.soinside.com 2019 - 2024. All rights reserved.