shellStreamLines function

Stream<String> shellStreamLines(
  1. Stream<List<int>> stream, {
  2. Encoding? encoding,
})

Basic line streaming. Assuming system encoding

Implementation

Stream<String> shellStreamLines(
  Stream<List<int>> stream, {
  Encoding? encoding,
}) {
  encoding ??= shellContext.encoding;
  StreamSubscription? subscription;
  List<int>? currentLine;
  const lineFeed = 10; // \n LF
  const carriageReturn = 13; // \r CR
  late StreamController<String> ctlr;

  // devPrint('listen (paused: $paused)');
  void addCurrentLine() {
    if (subscription?.isPaused ?? false) {
      // Do nothing, current line is discarded
    } else {
      if (currentLine != null) {
        try {
          ctlr.add(encoding!.decode(currentLine!));
        } catch (_) {
          // Ignore bad encoding
          // ignore: avoid_print
          print('ignoring: $currentLine');
        }
      }
    }
    currentLine = null;
  }

  ctlr = StreamController<String>(
    onPause: () {
      if (shellDebug) {
        // ignore: avoid_print
        print('onPause (paused: ${subscription?.isPaused})');
      }
      // Last one
      addCurrentLine();
      subscription?.pause();
    },
    onResume: () {
      // devPrint('onResume (paused: $paused)');
      if (subscription?.isPaused ?? false) {
        subscription?.resume();
      }
    },
    onListen: () {
      void addToCurrentLine(List<int> data) {
        if (currentLine == null) {
          currentLine = data;
        } else {
          var newCurrentLine = Uint8List(currentLine!.length + data.length);
          newCurrentLine.setAll(0, currentLine!);
          newCurrentLine.setAll(currentLine!.length, data);
          currentLine = newCurrentLine;
        }
      }

      var lastWasCR = false;
      subscription = stream.listen(
        (data) {
          var paused = subscription?.isPaused ?? false;
          // devPrint('read $data (paused: $paused)');
          if (paused) {
            return;
          }
          // look for \n (10)
          var start = 0;

          for (var i = 0; i < data.length; i++) {
            var byte = data[i];
            if (byte == lineFeed || byte == carriageReturn) {
              if (byte == lineFeed) {
                // Ignore CRLF
                if (lastWasCR) {
                  lastWasCR = false;
                  start = i + 1;
                  continue;
                }
              } else {
                lastWasCR = true;
              }
              addToCurrentLine(data.sublist(start, i));
              addCurrentLine();

              // Skip it
              start = i + 1;
            } else {
              lastWasCR = false;
            }
          }
          // Store last current line
          if (data.length > start) {
            addToCurrentLine(data.sublist(start, data.length));
          }
        },
        onDone: () {
          // devPrint('onDone');
          // Last one
          addCurrentLine();
          ctlr.close();
        },
        onError: (Object e, StackTrace st) {
          ctlr.addError(e, st);
        },
      );
    },
    onCancel: () {
      // devPrint('onCancel');
      subscription?.cancel();
    },
    sync: true,
  );

  return ctlr.stream;
}