mine method
Mines the frequent itemsets from the given transaction stream.
streamProvider is a function that returns a new stream of transactions
for each pass of the algorithm. This is crucial for handling large
datasets from sources like files, as it allows the data to be processed
Returns a record containing a map of frequent itemsets to their support counts, and the total number of non-empty transactions processed.
Implementation
// without being fully loaded into memory.
///
/// Returns a record containing a map of frequent itemsets to their support
/// counts, and the total number of non-empty transactions processed.
Future<(Map<List<T>, int>, int)> mine(
Stream<List<T>> Function() streamProvider,
) async {
_logger.info(
'Starting frequent itemset mining with minSupport: $minSupport',
);
// Pass 1: Calculate frequencies and count transactions
_logger.debug('Pass 1: Calculating initial item frequencies...');
final frequency = <int, int>{};
int transactionCount = 0;
await for (final transaction in streamProvider()) {
if (transaction.isNotEmpty) {
transactionCount++;
for (final item in transaction) {
final id = _mapper.getId(item);
frequency[id] = (frequency[id] ?? 0) + 1;
}
}
}
if (transactionCount == 0) {
_logger.warning('No non-empty transactions to mine');
return (<List<T>, int>{}, 0);
}
_logger.debug('Found $transactionCount non-empty transactions.');
final absoluteMinSupport = calculateAbsoluteMinSupport(
minSupport,
transactionCount,
);
_logger.debug('Filtering frequent items...');
final frequentItems = filterFrequentItems(frequency, absoluteMinSupport);
if (frequentItems.isEmpty) {
_logger.warning('No frequent items found with minSupport: $minSupport');
return (<List<T>, int>{}, transactionCount);
}
_logger.debug('Found ${frequentItems.length} frequent items.');
_logger.debug('Pass 2: Building FP-Tree...');
// Pass 2: Build the FP-Tree
final tree = FPTree(frequentItems);
await for (final transaction in streamProvider()) {
final orderedItems = _prepareOrderedTransaction(
transaction.map((t) => _mapper.getId(t)).toList(),
frequentItems,
);
if (orderedItems.isNotEmpty) {
tree.addTransaction(orderedItems, 1);
}
}
_logger.debug('FP-Tree built.');
_logger.info('Starting recursive mining...');
final Map<List<int>, int> mappedItemsets;
if (parallelism == 1) {
mappedItemsets = _mineSingleThreaded(
tree,
frequentItems,
absoluteMinSupport,
);
} else {
mappedItemsets = await runParallelMining(
tree: tree,
frequentItems: frequentItems,
absoluteMinSupport: absoluteMinSupport,
mapper: _mapper,
logger: _logger,
parallelism: parallelism,
);
}
_logger.info(
'Finished mining. Found ${mappedItemsets.length} frequent itemsets.',
);
// Unmap the results before returning
final unmappedItemsets = mappedItemsets.map(
(itemset, support) => MapEntry(_mapper.unmapItemset(itemset), support),
);
return (unmappedItemsets, transactionCount);
}