maybeRebroadcastQueries method

bool maybeRebroadcastQueries({
  1. ObservableQuery<Object?>? exclude,
  2. bool force = false,
})

Rebroadcast cached queries with changed underlying data if cache.broadcastRequested or force.

Push changed data from cache to query streams. exclude is used to skip a query if it was recently executed (normally the query that caused the rebroadcast)

Returns whether a broadcast was executed, which depends on the state of the cache. If there are multiple in-flight cache updates, we wait until they all complete

Note on internal implementation details: There is sometimes confusion on when this is called, but rebroadcasts are requested from every addQueryResult where result.isNotLoading as an OnData callback from ObservableQuery.

Implementation

bool maybeRebroadcastQueries({
  ObservableQuery<Object?>? exclude,
  bool force = false,
}) {
  if (rebroadcastLocked && !force) {
    return false;
  }

  final shouldBroadcast = cache.shouldBroadcast(claimExecution: true);

  if (!shouldBroadcast && !force) {
    return false;
  }

  // If two ObservableQueries are backed by the same [Request], we only need
  // to [readQuery] for it once.
  final Map<Request, QueryResult<Object?>> diffQueryResultCache = {};
  final Map<Request, bool> ignoreQueryResults = {};
  for (final query in queries.values) {
    final Request request = query.options.asRequest;
    final cachedQueryResult = diffQueryResultCache[request];
    if (query == exclude || !query.isRebroadcastSafe) {
      continue;
    }
    if (cachedQueryResult != null) {
      // We've already done the diff and denormalized, emit to the observable
      addQueryResult(
        request,
        query.queryId,
        cachedQueryResult,
        fromRebroadcast: true,
      );
    } else if (ignoreQueryResults.containsKey(request)) {
      // We've already seen this one and don't need to notify
      continue;
    } else {
      // We haven't seen this one yet, denormalize from cache and diff
      final cachedData = cache.readQuery(
        query.options.asRequest,
        optimistic: query.options.policies.mergeOptimisticData,
      );
      if (_cachedDataHasChangedFor(query, cachedData)) {
        // The data has changed
        final queryResult = QueryResult(
          data: cachedData,
          options: query.options,
          source: QueryResultSource.cache,
        );
        diffQueryResultCache[request] = queryResult;
        addQueryResult(
          request,
          query.queryId,
          queryResult,
          fromRebroadcast: true,
        );
      } else {
        ignoreQueryResults[request] = true;
      }
    }
  }
  return true;
}