executeObservable method
Stream
executeObservable(
{ - required String methodName,
- Map<String, dynamic> arguments = const {},
})
override
Implementation
@override
Stream executeObservable({
required String methodName,
Map<String, dynamic> arguments = const {},
}) {
final requestId = _generateRequestId();
final resultStream = _observableCallbackController.stream
.where((data) => data.requestId == requestId)
.map((event) => event.value);
final commandStream = _channel
.invokeMethod(
Action.subscribe.name,
{
"requestId": requestId,
"methodName": methodName,
"methodType": MethodType.observable.name,
"arguments": arguments,
},
)
.asStream()
.transform(StreamTransformer.fromHandlers(
handleData: (data, sink) => sink.close(),
handleError: (error, stackTrace, sink) {
sink.addError(error, stackTrace);
sink.close();
},
handleDone: (sink) => sink.close(),
));
return resultStream.mergeWith([commandStream]).doOnCancel(() {
_cancelOperation(requestId);
});
}