Python中的哈希函数家族生成器

11 投票
5 回答
7799 浏览
提问于 2025-04-15 19:13

我在找一个可以生成哈希函数家族的工具,这个工具能根据一组参数生成一系列哈希函数。到目前为止,我还没有找到这样的工具。

请问有没有办法用 hashlib 这个包来做到这一点呢?

举个例子,我想做类似这样的事情:

h1 = hash_function(1)
h2 = hash_function(2)
...

这样 h1h2 就会是不同的哈希函数。

对于那些可能知道的人,我正在尝试在一个非常大的数据集上实现一个最小哈希算法。

基本上,我有一大堆特征(从1亿到10亿个)用于某个文档,我需要为这组特征创建1000到10000个不同的随机排列。

我并不想明确地构建这些随机排列,所以我想用以下的方法:

  1. 生成一个哈希函数 h,并考虑两个索引 rs
  2. 如果在排列中 r 出现在 s 之前,那么就有 h(r) < h(s),然后对100到1000个不同的哈希函数这样做。

有没有我可能错过的已知库?或者你知道的用Python生成哈希函数家族的标准方法吗?

5 个回答

1

@alex的回答很好且简洁,但它生成的哈希函数之间并不是“非常不同”。

我们来看一下10000个样本中10000个哈希值的皮尔逊相关性,这些结果被放入了100个箱子里。

%%time # 1min 14s
n=10000
hashes = [hash_function(i) for i in range(n)]
median_pvalue(hashes, n=n)
# 1.1614081043690444e-06

也就是说,中位数的p值是1e-06,这远远不是随机的。如果它真的是随机的,情况会是这样的:

%%time # 4min 15s
hashes = [lambda _ : random.randint(0,100) for _ in range(n)]
median_pvalue(hashes, n=n)
# 0.4979718236429698

使用卡特和韦格曼的方法,你可以得到:

%%time # 1min 43s
hashes = HashFamily(100).draw_hashes(n)
median_pvalue(hashes, n=n)
# 0.841929288037321

重现的代码:


from scipy.stats.stats import pearsonr 
import numpy as np
import random

_memomask = {}

def hash_function(n):
    mask = _memomask.get(n)
    if mask is None:
        random.seed(n)
        mask = _memomask[n] = random.getrandbits(32)
    def myhash(x):
        return hash(x) ^ mask
    return myhash

class HashFamily():
    r"""Universal hash family as proposed by Carter and Wegman.
    .. math::
            \begin{array}{ll}
            h_{{a,b}}(x)=((ax+b)~{\bmod  ~}p)~{\bmod  ~}m \ \mid p > m\\
            \end{array}
    Args:
        bins (int): Number of bins to hash to. Better if a prime number.
        moduler (int,optional): Temporary hashing. Has to be a prime number.
    """
    def __init__(self, bins, moduler=None):
        if moduler and moduler <= bins:
            raise ValueError("p (moduler) should be >> m (buckets)")

        self.bins = bins
        self.moduler = moduler if moduler else self._next_prime(np.random.randint(self.bins + 1, 2**32))

        # do not allow same a and b, as it could mean shifted hashes
        self.sampled_a = set()
        self.sampled_b = set()

    def _is_prime(self, x):
        """Naive is prime test."""
        for i in range(2, int(np.sqrt(x))):
            if x % i == 0:
                return False
        return True

    def _next_prime(self, n):
        """Naively gets the next prime larger than n."""
        while not self._is_prime(n):
            n += 1

        return n

    def draw_hash(self, a=None, b=None):
        """Draws a single hash function from the family."""
        if a is None:
            while a is None or a in self.sampled_a:
                a = np.random.randint(1, self.moduler - 1)
                assert len(self.sampled_a) < self.moduler - 2, "please give a bigger moduler"

            self.sampled_a.add(a)
        if b is None:
            while b is None or b in self.sampled_b:
                b = np.random.randint(0, self.moduler - 1)
                assert len(self.sampled_b) < self.moduler - 1, "please give a bigger moduler"

            self.sampled_b.add(b)

        return lambda x: ((a * x + b) % self.moduler) % self.bins

    def draw_hashes(self, n, **kwargs):
        """Draws n hash function from the family."""
        return [self.draw_hash() for i in range(n)]

def median_pvalue(hashes, buckets=100, n=1000):
    p_values = []
    for j in range(n-1):
        a = [hashes[j](i) % buckets for i in range(n)]
        b = [hashes[j+1](i) % buckets for i in range(n)]
        p_values.append(pearsonr(a,b)[1])
    return np.median(p_values)

请注意,我对卡特和韦格曼的实现非常简单(例如,生成素数的方式)。这个过程可以做得更短、更快。

3

如上所述,你可以使用通用哈希来进行最小哈希(minhash)。

import random



def minhash():
    d1 = set(random.randint(0, 2000) for _ in range(1000))
    d2 = set(random.randint(0, 2000) for _ in range(1000))
    jacc_sim = len(d1.intersection(d2)) / len(d1.union(d2))
    print("jaccard similarity: {}".format(jacc_sim))

    N_HASHES = 200
    hash_funcs = []
    for i in range(N_HASHES):
        hash_funcs.append(universal_hashing())

    m1 = [min([h(e) for e in d1]) for h in hash_funcs]
    m2 = [min([h(e) for e in d2]) for h in hash_funcs]
    minhash_sim = sum(int(m1[i] == m2[i]) for i in range(N_HASHES)) / N_HASHES
    print("min-hash similarity: {}".format(minhash_sim))



def universal_hashing():
    def rand_prime():
        while True:
            p = random.randrange(2 ** 32, 2 ** 34, 2)
            if all(p % n != 0 for n in range(3, int((p ** 0.5) + 1), 2)):
                return p
    m = 2 ** 32 - 1
    p = rand_prime()
    a = random.randint(0, p)
    if a % 2 == 0:
        a += 1
    b = random.randint(0, p)
    def h(x):
        return ((a * x + b) % p) % m
    return h

参考链接

11

我会这样做(如果你不需要线程安全的话——如果需要线程安全的话,改起来也不难——假设你使用的是32位的Python版本):

import random

_memomask = {}

def hash_function(n):
  mask = _memomask.get(n)
  if mask is None:
    random.seed(n)
    mask = _memomask[n] = random.getrandbits(32)
  def myhash(x):
    return hash(x) ^ mask
  return myhash

撰写回答