dataset = [['Milk', 'Onion', 'Nutmeg', 'Kidney Beans', 'Eggs', 'Yogurt'],
           ['Dill', 'Onion', 'Nutmeg', 'Kidney Beans', 'Eggs', 'Yogurt'],
           ['Milk', 'Apple', 'Kidney Beans', 'Eggs'],
           ['Milk', 'Unicorn', 'Corn', 'Kidney Beans', 'Yogurt'],
           ['Corn', 'Onion', 'Onion', 'Kidney Beans', 'Ice cream', 'Eggs']]

one_ary = np.array([[0, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1],
                    [0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1],
                    [1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0],
                    [0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1],
                    [0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0]])

cols = ['Apple', 'Corn', 'Dill', 'Eggs', 'Ice cream', 'Kidney Beans', 'Milk',
        'Nutmeg', 'Onion', 'Unicorn', 'Yogurt']

df = pd.DataFrame(one_ary, columns=cols)


TypeError                                 Traceback (most recent call last)
<ipython-input-17-c7221dc109b5> in <module>()
      1 from math import sqrt
      2 from joblib import Parallel, delayed
----> 3 out = Parallel(n_jobs=1, verbose=100, pre_dispatch='1.5*n_jobs')(delayed(sqrt)(i) for i in apriori(df))

D:\Anaconda3\lib\site-packages\joblib\parallel.py in __call__(self, iterable)
    777             # was dispatched. In particular this covers the edge
    778             # case of Parallel used with an exhausted iterator.
--> 779             while self.dispatch_one_batch(iterator):
    780                 self._iterating = True
    781             else:

D:\Anaconda3\lib\site-packages\joblib\parallel.py in dispatch_one_batch(self, iterator)
    623                 return False
    624             else:
--> 625                 self._dispatch(tasks)
    626                 return True

D:\Anaconda3\lib\site-packages\joblib\parallel.py in _dispatch(self, batch)
    586         dispatch_timestamp = time.time()
    587         cb = BatchCompletionCallBack(dispatch_timestamp, len(batch), self)
--> 588         job = self._backend.apply_async(batch, callback=cb)
    589         self._jobs.append(job)

D:\Anaconda3\lib\site-packages\joblib\_parallel_backends.py in apply_async(self, func, callback)
    109     def apply_async(self, func, callback=None):
    110         """Schedule a func to be run"""
--> 111         result = ImmediateResult(func)
    112         if callback:
    113             callback(result)

D:\Anaconda3\lib\site-packages\joblib\_parallel_backends.py in __init__(self, batch)
    330         # Don't delay the application, to avoid keeping the input
    331         # arguments in memory
--> 332         self.results = batch()
    334     def get(self):

D:\Anaconda3\lib\site-packages\joblib\parallel.py in __call__(self)
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
    133     def __len__(self):

D:\Anaconda3\lib\site-packages\joblib\parallel.py in <listcomp>(.0)
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
    133     def __len__(self):

TypeError: a float is required

from math import sqrt
from joblib import Parallel, delayed
out = Parallel(n_jobs=1, verbose=100, pre_dispatch='1.5*n_jobs')
(delayed(sqrt)(i) for i in apriori(df))



from itertools import combinations
import numpy as np
import pandas as pd

def apriori(df, min_support=0.5, use_colnames=False, max_len=None):
    """Get frequent itemsets from a one-hot DataFrame
    df : pandas DataFrame
      pandas DataFrame in one-hot encoded format. For example
             Apple  Bananas  Beer  Chicken  Milk  Rice
        0      1        0     1        1     0     1
        1      1        0     1        0     0     1
        2      1        0     1        0     0     0
        3      1        1     0        0     0     0
        4      0        0     1        1     1     1
        5      0        0     1        0     1     1
        6      0        0     1        0     1     0
        7      1        1     0        0     0     0
    min_support : float (default: 0.5)
      A float between 0 and 1 for minumum support of the itemsets returned.
      The support is computed as the fraction
      transactions_where_item(s)_occur / total_transactions.
    use_colnames : bool (default: False)
      If true, uses the DataFrames' column names in the returned DataFrame
      instead of column indices.
    max_len : int (default: None)
      Maximum length of the itemsets generated. If `None` (default) all
      possible itemsets lengths (under the apriori condition) are evaluated.
    pandas DataFrame with columns ['support', 'itemsets'] of all itemsets
    that are >= `min_support` and < than `max_len` (if `max_len` is not None).

    X = df.values
    ary_col_idx = np.arange(X.shape[1])
    support = (np.sum(X, axis=0) / float(X.shape[0]))
    support_dict = {1: support[support >= min_support]}
    itemset_dict = {1: ary_col_idx[support >= min_support].reshape(-1, 1)}
    max_itemset = 1

    if max_len is None:
        max_len = float('inf')

    while max_itemset and max_itemset < max_len:
        next_max_itemset = max_itemset + 1
        combin = combinations(np.unique(itemset_dict[max_itemset].flatten()),
        frequent_items = []
        frequent_items_support = []

        for c in combin:
            together = X[:, c].sum(axis=1) == len(c)
            support = together.sum() / float(X.shape[0])
            if support >= min_support:

        if frequent_items:
            itemset_dict[next_max_itemset] = np.array(frequent_items)
            support_dict[next_max_itemset] = np.array(frequent_items_support)
            max_itemset = next_max_itemset
            max_itemset = 0

    all_res = []
    for k in sorted(itemset_dict):
        support = pd.Series(support_dict[k])
        itemsets = pd.Series([i for i in itemset_dict[k]])

        res = pd.concat((support, itemsets), axis=1)

    res_df = pd.concat(all_res)
    res_df.columns = ['support', 'itemsets']
    if use_colnames:
        mapping = {idx: item for idx, item in enumerate(df.columns)}
        res_df['itemsets'] = res_df['itemsets'].apply(lambda x: [mapping[i]
                                                      for i in x])
    res_df = res_df.reset_index(drop=True)

    return res_df


pre_dispatch: {‘all’,integer, or expression, as in ‘3*n_jobs’}

人们可能会用一些更疯狂和危险的东西来挑战可能的字符串表达式的解释,比如"max( 2, int( 1.5 * n_jobs ) )"


人们还可能会反对,当使用基于multiprocessing的后端时,joblib服务诉诸于全过程实例化,如果只要求math.sqrt(i)结果,那么这样做的代价(在计算复杂性的[PTIME]和[PSPACE]维度)显然很低。适当的成本报酬评估应该是合理的步骤(Ref.: Overheads and related sections in Amdahl's Law Criticism)。你知道吗

