IsolateChannel<T>.connectReceive constructor

IsolateChannel<T>.connectReceive(
  1. ReceivePort receivePort
)

Connects to a remote channel that was created with IsolateChannel.connectSend.

These constructors establish a connection using only a single SendPort/ReceivePort pair, as long as each side uses one of the connect constructors.

The connection protocol is guaranteed to remain compatible across versions at least until the next major version release. If the protocol is violated, the resulting channel will emit a single value on its stream and then close.

Implementation

factory IsolateChannel.connectReceive(ReceivePort receivePort) {
  // We can't use a [StreamChannelCompleter] here because we need the return
  // value to be an [IsolateChannel].
  var isCompleted = false;
  var streamCompleter = StreamCompleter<T>();
  var sinkCompleter = StreamSinkCompleter<T>();

  var channel = IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink
      .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) {
    if (!isCompleted) {
      receivePort.close();
      streamCompleter.setSourceStream(const Stream.empty());
      sinkCompleter.setDestinationSink(NullStreamSink<T>());
    }
    sink.close();
  })));

  // The first message across the ReceivePort should be a SendPort pointing to
  // the remote end. If it's not, we'll make the stream emit an error
  // complaining.
  late StreamSubscription<dynamic> subscription;
  subscription = receivePort.listen((message) {
    isCompleted = true;
    if (message is SendPort) {
      var controller =
          StreamChannelController<T>(allowForeignErrors: false, sync: true);
      SubscriptionStream(subscription).cast<T>().pipe(controller.local.sink);
      controller.local.stream
          .listen((data) => message.send(data), onDone: receivePort.close);

      streamCompleter.setSourceStream(controller.foreign.stream);
      sinkCompleter.setDestinationSink(controller.foreign.sink);
      return;
    }

    streamCompleter.setError(
        StateError('Unexpected Isolate response "$message".'),
        StackTrace.current);
    sinkCompleter.setDestinationSink(NullStreamSink<T>());
    subscription.cancel();
  });

  return channel;
}