executeObservable method

  1. @override
Stream executeObservable({
  1. required String methodName,
  2. Map<String, dynamic> arguments = const {},
})
override

Implementation

@override
Stream executeObservable({
  required String methodName,
  Map<String, dynamic> arguments = const {},
}) {
  final requestId = _generateRequestId();
  final resultStream = _observableCallbackController.stream
      .where((data) => data.requestId == requestId)
      .map((event) => event.value);

  final commandStream = _channel
      .invokeMethod(
        Action.subscribe.name,
        {
          "requestId": requestId,
          "methodName": methodName,
          "methodType": MethodType.observable.name,
          "arguments": arguments,
        },
      )
      .asStream()
      .transform(StreamTransformer.fromHandlers(
        handleData: (data, sink) => sink.close(),
        handleError: (error, stackTrace, sink) {
          sink.addError(error, stackTrace);
          sink.close();
        },
        handleDone: (sink) => sink.close(),
      ));

  return resultStream.mergeWith([commandStream]).doOnCancel(() {
    _cancelOperation(requestId);
  });
}