squadron_builder 2.3.0 copy "squadron_builder: ^2.3.0" to clipboard
squadron_builder: ^2.3.0 copied to clipboard

Dart code generator for Squadron workers. Implement your worker service and let squadron_builder bridge the gap with Web Workers and Isolates!

example/main.dart

import 'dart:async';

import 'package:squadron/squadron.dart';

import 'echo_service.dart';
import 'fibonacci_service.dart';
import 'service_config.dart';
import 'service_request.dart';
import 'perf_counters.dart';
import 'deviation_monitor.dart';

void main() async {
  setupSquadron();

  // start a periodic timer to measure timer deviation while executing the different scenarios.
  final resolution = Duration(milliseconds: 20);
  final deviationMonitor = await installDeviationMonitor(resolution);

  // service parameters
  final trace = false;
  final workload = ServiceConfig('workload', 250);

  // FIRST RUN: single-threaded (all in main thread)
  final serviceCounters = await runServices(deviationMonitor, trace, workload);

  // SECOND RUN: one worker (= main thread + 1 worker thread)
  final workerCounters = await runWorkers(deviationMonitor, trace, workload);

  // THIRD RUN: worker pool (= main thread + n worker threads)
  final poolCounters = await runPools(deviationMonitor, trace, workload);

  // print summary
  displaySummary(resolution, serviceCounters, workerCounters, poolCounters);
}

void setupSquadron() {
  Squadron.setId('MAIN');
  Squadron.setLogger(ConsoleSquadronLogger());
  Squadron.debugMode = false;
  Squadron.logLevel = SquadronLogLevel.info;
}

Future<DeviationMonitor> installDeviationMonitor(Duration resolution) async {
  final deviationMonitor = DeviationMonitor(resolution);

  // test timer deviation monitor
  deviationMonitor.start();
  final impactLevel = 10;

  // asynchronous, should have no impact
  await Future.delayed(resolution * impactLevel);
  // synchronous, should trigger a deviation by approx (impactLevel - 1) * 100 % (eg. ~900% for impactLevel = 10)
  final sw = Stopwatch()..start();
  while (sw.elapsedMilliseconds < resolution.inMilliseconds * impactLevel) {
    // CPU loop
  }
  // asynchronous, should have no impact
  await Future.delayed(resolution * impactLevel);

  deviationMonitor.stop();
  return deviationMonitor;
}

void displaySummary(Duration resolution, PerfCounters serviceCounters,
    PerfCounters workerCounters, PerfCounters poolCounters) {
  final workerToService = workerCounters.percentTo(serviceCounters);
  final workerPoolToService = poolCounters.percentTo(serviceCounters);

  print('''
==== SUMMARY ====

MAX TIMER DELAY (resolution = $resolution aka ${1000 / resolution.inMilliseconds} frames/sec)
    * main thread: ${serviceCounters.maxDeviation} (${percent(resolution, serviceCounters.maxDeviation).toStringAsFixed(2)} %)
    * worker: ${workerCounters.maxDeviation} (${percent(resolution, workerCounters.maxDeviation).toStringAsFixed(2)} %)
    * worker pool: ${poolCounters.maxDeviation} (${percent(resolution, poolCounters.maxDeviation).toStringAsFixed(2)}) %

MAIN THREAD (baseline): executed in the main event loop.
  * Fib:  ${serviceCounters.fib}
  * Echo: ${serviceCounters.echo}
  * Perf: ${serviceCounters.perf}

SINGLE WORKERS vs MAIN THREAD: worker counters should be slightly worse because
of serialization/deserialization. The main advantage in this scenario is to
free the main event loop, eg in user-facing apps to avoid glitches in the UI.
  * Fib:  ${workerCounters.fib} --> ${workerToService['fib']?.toStringAsFixed(2)} %
  * Echo: ${workerCounters.echo} --> ${workerToService['echo']?.toStringAsFixed(2)} %
  * Perf: ${workerCounters.perf} --> ${workerToService['perf']?.toStringAsFixed(2)} %

WORKER POOL vs MAIN THREAD: worker pool counters should be much better even
considering the overhead of serialization/deserialization and worker scheduling.
Perf improvement depends on method execution time: the heavier the workload,
the more performance will be improved.
  * Fib:  ${poolCounters.fib} --> ${workerPoolToService['fib']?.toStringAsFixed(2)} %
  * Echo: ${poolCounters.echo} --> ${workerPoolToService['echo']?.toStringAsFixed(2)} %
  * Perf: ${poolCounters.perf} --> ${workerPoolToService['perf']?.toStringAsFixed(2)} %
''');
}

Future<PerfCounters> runServices(DeviationMonitor monitor, bool trace,
    ServiceConfig<int> workloadDelay) async {
  Squadron.info('''
----------------------------------------------------------
1. Computing with services (single-threaded)
----------------------------------------------------------
''');

  final fibonacciService = FibonacciService(trace: trace);
  final echoService = EchoService(trace, workloadDelay);

  monitor.start();
  var counters = await testWith(monitor, fibonacciService, echoService);
  await Future.delayed(Duration.zero);
  counters += await testWith(monitor, fibonacciService, echoService);
  monitor.stop();

  print('');
  return counters / 2;
}

void platformWorkerHook<T extends WorkerService>(PlatformWorker w) {
  Squadron.info(
      'Worker ready for $T (platform worker type = ${w.runtimeType})');
}

void displayStats(WorkerStat stats) {
  Squadron.info(
      '${stats.workerType} ${stats.id} (${stats.status}): totalWorkload=${stats.totalWorkload}, upTime=${stats.upTime}, idleTime=${stats.idleTime}');
}

Future<PerfCounters> runWorkers(DeviationMonitor monitor, bool trace,
    ServiceConfig<int> workloadDelay) async {
  Squadron.info('''
----------------------------------------------------------
2. Computing with single service workers (one thread each)
----------------------------------------------------------
''');

  final fibonacciWorker = FibonacciServiceWorker(
      trace: trace,
      platformWorkerHook: (w) => platformWorkerHook<FibonacciServiceWorker>(w));
  final echoWorker = EchoServiceWorker(
      trace, workloadDelay, (w) => platformWorkerHook<EchoServiceWorker>(w));

  await Future.wait([fibonacciWorker.start(), echoWorker.start()]);

  monitor.start();
  var counters = await testWith(monitor, fibonacciWorker, echoWorker);
  await Future.delayed(Duration.zero);
  counters += await testWith(monitor, fibonacciWorker, echoWorker);
  monitor.stop();

  displayStats(fibonacciWorker.stats);
  displayStats(echoWorker.stats);

  // clean up workers... should not be necessary if with_finalizers was set to true when the code was generated
  fibonacciWorker.stop();
  echoWorker.stop();

  print('');
  return counters / 2;
}

Future<PerfCounters> runPools(DeviationMonitor monitor, bool trace,
    ServiceConfig<int> workloadDelay) async {
  Squadron.info('''
----------------------------------------------------------
3. Computing with service worker pools (multithreaded)
----------------------------------------------------------
''');

  final concurrency = ConcurrencySettings(
    minWorkers: 2,
    maxWorkers: 6,
    maxParallel: 1,
  );
  final fibonacciPool = FibonacciServiceWorkerPool(
      trace: trace,
      concurrencySettings: concurrency,
      platformWorkerHook: (w) =>
          platformWorkerHook<FibonacciServiceWorkerPool>(w));
  final echoPool = EchoServiceWorkerPool(trace, workloadDelay, concurrency,
      (w) => platformWorkerHook<EchoServiceWorkerPool>(w));

  await Future.wait(
      [fibonacciPool.start().toFuture(), echoPool.start().toFuture()]);

  monitor.start();
  var counters = await testWith(monitor, fibonacciPool, echoPool);
  await Future.delayed(Duration.zero);
  counters += await testWith(monitor, fibonacciPool, echoPool);
  monitor.stop();

  await Future.delayed(Duration(milliseconds: 500));

  fibonacciPool.stop((w) => w.idleTime.inMilliseconds > 400);
  fibonacciPool.fullStats.forEach(displayStats);

  echoPool.stop((w) => w.idleTime.inMilliseconds > 400);
  echoPool.fullStats.forEach(displayStats);

  // clean up pools... should not be necessary if with_finalizers was set to true when the code was generated
  fibonacciPool.stop();
  echoPool.stop();

  print('');
  return counters / 2;
}

Future<PerfCounters> testWith(DeviationMonitor monitor,
    FibonacciService fibonacciService, EchoService echoService) async {
  final sw = Stopwatch()..start();
  await testFibWith(fibonacciService);
  final fib = sw.elapsed;
  Squadron.info('  --> ELAPSED(fib) = $fib');
  sw.reset();
  await testEchoWith(echoService);
  final echo = sw.elapsed;
  Squadron.info('  --> ELAPSED(echo) = $echo');
  sw.reset();
  await perfTestEchoWith(echoService);
  await perfTestJsonEchoWith(echoService);
  final perf = sw.elapsed;
  Squadron.info('  --> ELAPSED(perf) = $perf');
  return PerfCounters(fib, echo, perf, monitor.maxDelay.inMilliseconds);
}

Future<void> testFibWith(FibonacciService fibonacciService) async {
  final futures = <Future>[];

  for (var i = 0; i < 10; i++) {
    futures.add(fibonacciService.fibonacci(20 + i).toFuture().then((res) =>
        Squadron.fine(
            '[${fibonacciService.runtimeType}] fibonacci(${20 + i}) = $res')));
  }
  await Future.wait(futures);
  futures.clear();

  await Future.delayed(Duration.zero);

  for (var i = 0; i < 10; i++) {
    futures.add(fibonacciService.fibonacciList0(20 + i, 30 + i).toFuture().then(
        (res) => Squadron.fine(
            '[${fibonacciService.runtimeType}] fibonacciList0(${20 + i}, ${30 + i}) = $res')));
  }
  await Future.wait(futures);
  futures.clear();

  await Future.delayed(Duration.zero);

  for (var i = 0; i < 10; i++) {
    futures.add(fibonacciService.fibonacciList1(20 + i, 30 + i).toFuture().then(
        (res) => Squadron.fine(
            '[${fibonacciService.runtimeType}] fibonacciList1(${20 + i}, ${30 + i}) = $res')));
  }
  await Future.wait(futures);
  futures.clear();

  await Future.delayed(Duration.zero);

  for (var i = 0; i < 10; i++) {
    futures.add(fibonacciService.fibonacciList2(20 + i, 30 + i).toFuture().then(
        (res) => Squadron.fine(
            '[${fibonacciService.runtimeType}] fibonacciList2(${20 + i}, ${30 + i}) = $res')));
  }
  await Future.wait(futures);
  futures.clear();

  await Future.delayed(Duration.zero);

  for (var i = 0; i < 10; i++) {
    futures.add(fibonacciService
        .fibonacciStream(20 + i, end: 30 + i)
        .toList()
        .then((res) => Squadron.fine(
            '[${fibonacciService.runtimeType}] fibonacciStream(${20 + i}, ${30 + i}) = $res')));
  }
  await Future.wait(futures);
  futures.clear();

  await Future.delayed(Duration.zero);
}

const int loops = 500;

Future<void> testEchoWith(EchoService echoService) async {
  final futures = <Future>[];

  for (var i = 0; i < loops; i++) {
    futures.add(echoService
        .jsonEchoWithJsonResult(ServiceRequest('echo $i'))
        .toFuture()
        .then((res) => Squadron.fine(
            '[${echoService.runtimeType}] jsonEchoWithJsonResult(\'echo $i\') = ${res?.toJson()}')));
  }
  await Future.wait(futures);
  futures.clear();

  for (var i = 0; i < loops; i++) {
    futures.add(echoService
        .jsonEchoWithExplicitResult(ServiceRequest('echo $i'))
        .toFuture()
        .then((res) => Squadron.fine(
            '[${echoService.runtimeType}] jsonEchoWithExplicitResult(\'echo $i\') = ${res.toJson()}')));
  }
  await Future.wait(futures);
  futures.clear();

  for (var i = 0; i < loops; i++) {
    futures.add(echoService
        .explicitEchoWithJsonResult(ServiceRequest('echo $i'))
        .toFuture()
        .then((res) => Squadron.fine(
            '[${echoService.runtimeType}] explicitEchoWithJsonResult(\'echo $i\') = ${res.toJson()}')));
  }
  await Future.wait(futures);
  futures.clear();

  for (var i = 0; i < loops; i++) {
    futures.add(echoService
        .explicitEchoWithExplicitResult(ServiceRequest('echo $i'))
        .toFuture()
        .then((res) => Squadron.fine(
            '[${echoService.runtimeType}] explicitEchoWithExplicitResult(\'echo $i\') = ${res.toJson()}')));
  }
  await Future.wait(futures);
  futures.clear();
}

const int perfLoops = 5000;

Future<void> perfTestEchoWith(EchoService echoService) async {
  var futures = <Future>[];
  final jsonSw = Stopwatch()..start();
  for (var i = 0; i < perfLoops; i++) {
    final req = ServiceRequest('echo $i');
    futures.add(echoService.jsonEchoWithJsonResult(req).toFuture());
  }
  await Future.wait(futures);
  jsonSw.stop();

  futures = <Future>[];
  final explicitSw = Stopwatch()..start();
  for (var i = 0; i < perfLoops; i++) {
    final req = ServiceRequest('echo $i');
    futures.add(echoService.explicitEchoWithExplicitResult(req).toFuture());
  }
  await Future.wait(futures);
  explicitSw.stop();

  Squadron.info(
      '[${echoService.runtimeType}] json = ${jsonSw.elapsed} / explicit = ${explicitSw.elapsed}');
}

Future<void> perfTestJsonEchoWith(EchoService echoService) async {
  var futures = <Future>[];
  final jsonSw = Stopwatch()..start();
  for (var i = 0; i < perfLoops; i++) {
    final req = ServiceRequest('echo $i');
    futures.add(echoService.jsonEchoWithJsonResult(req).toFuture());
  }
  await Future.wait(futures);
  jsonSw.stop();

  futures = <Future>[];
  final jsonEncodeSw = Stopwatch()..start();
  for (var i = 0; i < perfLoops; i++) {
    final req = ServiceRequest('echo $i');
    futures.add(echoService.jsonEncodeEcho(req).toFuture());
  }
  await Future.wait(futures);
  jsonEncodeSw.stop();

  Squadron.info(
      '[${echoService.runtimeType}] json = ${jsonSw.elapsed} / json encode = ${jsonEncodeSw.elapsed}');
}

extension Futurizer<X> on FutureOr<X> {
  Future<X> toFuture() {
    final value = this;
    return (value is X) ? Future.value(value) : value;
  }
}
22
likes
0
points
4.78k
downloads

Publisher

unverified uploader

Weekly Downloads

Dart code generator for Squadron workers. Implement your worker service and let squadron_builder bridge the gap with Web Workers and Isolates!

Repository (GitHub)
View/report issues

License

unknown (license)

Dependencies

analyzer, build, source_gen, squadron

More

Packages that depend on squadron_builder