mine method

Future<(Map<List<T>, int>, int)> mine(
  1. Stream<List<T>> streamProvider()
)

Mines the frequent itemsets from the given transaction stream.

streamProvider is a function that returns a new stream of transactions for each pass of the algorithm. This is crucial for handling large datasets from sources like files, as it allows the data to be processed

Returns a record containing a map of frequent itemsets to their support counts, and the total number of non-empty transactions processed.

Implementation

//  without being fully loaded into memory.
///
/// Returns a record containing a map of frequent itemsets to their support
/// counts, and the total number of non-empty transactions processed.
Future<(Map<List<T>, int>, int)> mine(
  Stream<List<T>> Function() streamProvider,
) async {
  _logger.info(
    'Starting frequent itemset mining with minSupport: $minSupport',
  );

  // Pass 1: Calculate frequencies and count transactions
  _logger.debug('Pass 1: Calculating initial item frequencies...');
  final frequency = <int, int>{};
  int transactionCount = 0;

  await for (final transaction in streamProvider()) {
    if (transaction.isNotEmpty) {
      transactionCount++;
      for (final item in transaction) {
        final id = _mapper.getId(item);
        frequency[id] = (frequency[id] ?? 0) + 1;
      }
    }
  }

  if (transactionCount == 0) {
    _logger.warning('No non-empty transactions to mine');
    return (<List<T>, int>{}, 0);
  }
  _logger.debug('Found $transactionCount non-empty transactions.');

  final absoluteMinSupport = calculateAbsoluteMinSupport(
    minSupport,
    transactionCount,
  );

  _logger.debug('Filtering frequent items...');
  final frequentItems = filterFrequentItems(frequency, absoluteMinSupport);

  if (frequentItems.isEmpty) {
    _logger.warning('No frequent items found with minSupport: $minSupport');
    return (<List<T>, int>{}, transactionCount);
  }

  _logger.debug('Found ${frequentItems.length} frequent items.');

  _logger.debug('Pass 2: Building FP-Tree...');
  // Pass 2: Build the FP-Tree
  final tree = FPTree(frequentItems);
  await for (final transaction in streamProvider()) {
    final orderedItems = _prepareOrderedTransaction(
      transaction.map((t) => _mapper.getId(t)).toList(),
      frequentItems,
    );
    if (orderedItems.isNotEmpty) {
      tree.addTransaction(orderedItems, 1);
    }
  }
  _logger.debug('FP-Tree built.');

  _logger.info('Starting recursive mining...');
  final Map<List<int>, int> mappedItemsets;

  if (parallelism == 1) {
    mappedItemsets = _mineSingleThreaded(
      tree,
      frequentItems,
      absoluteMinSupport,
    );
  } else {
    mappedItemsets = await runParallelMining(
      tree: tree,
      frequentItems: frequentItems,
      absoluteMinSupport: absoluteMinSupport,
      mapper: _mapper,
      logger: _logger,
      parallelism: parallelism,
    );
  }

  _logger.info(
    'Finished mining. Found ${mappedItemsets.length} frequent itemsets.',
  );

  // Unmap the results before returning
  final unmappedItemsets = mappedItemsets.map(
    (itemset, support) => MapEntry(_mapper.unmapItemset(itemset), support),
  );

  return (unmappedItemsets, transactionCount);
}