海量数据算法-BitMap介绍和实现

作为一个有素质的程序员,在面试中(不是) 难免会遇到海量数据相关的问题,之前有注意过java.util下面有一个BitSet数据结构,但不是很明白是做什么用的。今天就来研究一下它背后的算法——BitMap(位图)算法。

BitMap的概念

关于BitMap的介绍和实现有一篇微信文章已经讲的很清楚了:

漫画:Bitmap算法 整合版

主要的思想就是把数据对应到bit位中,从而可以实现去重、查找、排序等功能。

我这篇文章就不复制粘贴了 (全是图片粘贴个锤子),主要看看java中BitMap的实现-BitSet

BitSet实现

重要内部字段

//用于存储数据
private long[] words;

这个字段说明了底层是long数组,而一个long是8个字节也就是64位。而且在BitSet中,把这样一个long单元称为“字”(word)。

private transient int wordsInUse = 0;

这个字段代表目前long数组中不为0的元素的最大的下标。(就是目前实际用了多少个字)

构造方法

BitSet()

public BitSet() {
    initWords(BITS_PER_WORD);
    sizeIsSticky = false;
}

initWords,看方法名就是初始化words数组了,而后面的BITS_PER_WORD应该就是默认容量。sizeIsSticky的java doc描述大概意思是判断这个bitset是不是用户指定容量创建的,如果是用户指定了容量,后面就尽可能会保有这个容量。

private final static int ADDRESS_BITS_PER_WORD = 6;
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;

private void initWords(int nbits) {
    words = new long[wordIndex(nbits-1) + 1];
}

private static int wordIndex(int bitIndex) {
    return bitIndex >> ADDRESS_BITS_PER_WORD;
}

ADDRESS_BITS_PER_WORD的意思就是一个字的地址占多少位,因为是64位是2的6次方,这里就是6。
BITS_PER_WORD就是一个字占多少位,这里就是1<<6即64位。

initWords的时候,我们传进去的默认容量为BITS_PER_WORD,这里看形参可以知道这个容量的单位是bit,那么也就是说默认构造函数只会创建一个能容纳64位的bitSet。

而BitSet怎么知道64位需要多少个字(long)呢?就是通过wordIndex去计算,将位值除以64,并向上取整。

后面wordIndex还会在索引的时候用到。

public BitSet(int nbits)

public BitSet(int nbits) {
    // nbits can't be negative; size 0 is OK
    if (nbits < 0)
        throw new NegativeArraySizeException("nbits < 0: " + nbits);

    initWords(nbits);
    sizeIsSticky = true;
}

这个构造方法就是用户自己指定初始容量了,过程和默认构造方法一样,只是把sizeIsSticky设为了true。

重要方法

set

public void set(int bitIndex) {
    if (bitIndex < 0)
        throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
    //计算出字的下标
    int wordIndex = wordIndex(bitIndex);
    expandTo(wordIndex);

    words[wordIndex] |= (1L << bitIndex); // Restores invariants
    //保证一些条件
    checkInvariants();
}

这个方法的作用就是把相应的位设置为true(1)。而expandTo()就是扩容了,要保证数组大小要大于这个字的下标。这个扩容方法我们放到后面再看。然后就是words[wordIndex] |= (1L << bitIndex); ,这里的1L << bitIndex是什么呢,java当左移超出一定范围的时候,对于long类型就是当左移位数大于等于1000000(64)的时候,只会取最后七位,也就是1L << bitIndex实际上就是bitIndex%64,算出了该位在字中的位置。然后用位或运算将该位置为1。

clear

public void clear(int bitIndex) {
    if (bitIndex < 0)
        throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);

    int wordIndex = wordIndex(bitIndex);
    if (wordIndex >= wordsInUse)
        return;

    words[wordIndex] &= ~(1L << bitIndex);

    recalculateWordsInUse();
    checkInvariants();
}

这个方法的作用就是把相应的位设置为false(0)。和set方法过程很像,用位与运算将对应的位置为0。

set(int fromIndex, int toIndex)

public void set(int fromIndex, int toIndex) {
    //检查范围是否有效
    checkRange(fromIndex, toIndex);

    if (fromIndex == toIndex)
        return;

    // Increase capacity if necessary
    // 获取起始位置和结束位置对应的字的下标,必要时扩容
    int startWordIndex = wordIndex(fromIndex);
    int endWordIndex   = wordIndex(toIndex - 1);
    expandTo(endWordIndex);

    long firstWordMask = WORD_MASK << fromIndex;
    long lastWordMask  = WORD_MASK >>> -toIndex;
    if (startWordIndex == endWordIndex) {
        // Case 1: One word
        words[startWordIndex] |= (firstWordMask & lastWordMask);
    } else {
        // Case 2: Multiple words
        // Handle first word
        words[startWordIndex] |= firstWordMask;

        // Handle intermediate words, if any
        for (int i = startWordIndex+1; i < endWordIndex; i++)
            words[i] = WORD_MASK;

        // Handle last word (restores invariants)
        words[endWordIndex] |= lastWordMask;
    }

    checkInvariants();
}

WORD_MASK是什么呢:

/* Used to shift left or right for a partial word mask */
private static final long WORD_MASK = 0xffffffffffffffffL;

就是一个全1的long型,这个函数先将起始位置中的起始位往后的位置1,然后将中间的字全部设为全1的,最后将结束位置中的结束位往前的位置置1。

intersects

有了上面那些基础方法,就可以进行一些逻辑判断了:

public boolean intersects(BitSet set) {
    for (int i = Math.min(wordsInUse, set.wordsInUse) - 1; i >= 0; i--)
        if ((words[i] & set.words[i]) != 0)
            return true;
    return false;
}

比较两个BitSet是否有相同的位被置1了。

and

public void and(BitSet set) {
    if (this == set)
        return;

    while (wordsInUse > set.wordsInUse)
        words[--wordsInUse] = 0;

    // Perform logical AND on words in common
    for (int i = 0; i < wordsInUse; i++)
        words[i] &= set.words[i];

    recalculateWordsInUse();
    checkInvariants();
}

非常常用的与运算,求两个集合的交集,需要两个bitset中对应的位都被置1。这里就是逐字做与运算。

or

public void or(BitSet set) {
    if (this == set)
        return;

    int wordsInCommon = Math.min(wordsInUse, set.wordsInUse);

    if (wordsInUse < set.wordsInUse) {
        ensureCapacity(set.wordsInUse);
        wordsInUse = set.wordsInUse;
    }

    // Perform logical OR on words in common
    for (int i = 0; i < wordsInCommon; i++)
        words[i] |= set.words[i];

    // Copy any remaining words
    if (wordsInCommon < set.wordsInUse)
        System.arraycopy(set.words, wordsInCommon,
                            words, wordsInCommon,
                            wordsInUse - wordsInCommon);

    // recalculateWordsInUse() is unnecessary
    checkInvariants();
}

非常常用的或运算,求两个集合的并集,需要两个bitset中任意一位被置1。这里就是逐字做或运算。

xor

public void xor(BitSet set) {
    int wordsInCommon = Math.min(wordsInUse, set.wordsInUse);

    if (wordsInUse < set.wordsInUse) {
        ensureCapacity(set.wordsInUse);
        wordsInUse = set.wordsInUse;
    }

    // Perform logical XOR on words in common
    for (int i = 0; i < wordsInCommon; i++)
        words[i] ^= set.words[i];

    // Copy any remaining words
    if (wordsInCommon < set.wordsInUse)
        System.arraycopy(set.words, wordsInCommon,
                            words, wordsInCommon,
                            set.wordsInUse - wordsInCommon);

    recalculateWordsInUse();
    checkInvariants();
}

异或运算。求不同时在两个集合内的元素。

扩容

最后来看扩容

private void expandTo(int wordIndex) {
    int wordsRequired = wordIndex+1;
    if (wordsInUse < wordsRequired) {
        ensureCapacity(wordsRequired);
        wordsInUse = wordsRequired;
    }
}

expandTo更新了wordsInUse,真正的扩容在ensureCapacity里面

private void ensureCapacity(int wordsRequired) {
    if (words.length < wordsRequired) {
        // 分配两倍数组大小或需要的数组大小中较大的那个。
        int request = Math.max(2 * words.length, wordsRequired);
        words = Arrays.copyOf(words, request);
        sizeIsSticky = false;
    }
}

使用实例

这样看来,BitSet好像只能存储数值类型的数据,实际上非数值类型可以使用哈希码或自定的哈希映射存储,但要保证哈希码只能唯一确定一个元素。

去重

public static void containChars(String str) {
    BitSet used = new BitSet();
    for (int i = 0; i < str.length(); i++)
        used.set(str.charAt(i)); // set bit for char
    StringBuilder sb = new StringBuilder();
    sb.append("[");
    int size = used.size();
    for (char i = 'a'; i <= 'z'; i++) {
        if (used.get(i)) {
            sb.append((char) i);
        }
    }
    sb.append("]");
    System.out.println(sb.toString());
}
public static void main(String[] args) {
    containChars("abcdfadsaomdosmdfodsingodsbnafgudbgiub");
}

(通常是遍历bitset中所有的位进行去重,这里因为字母有限可以遍历字母)

排序

排序也是遍历bitset中所有的位,但是对于哈希码存储的并没有排序作用。实际上就是用空间换时间。

public static void main(String[] args) {
    BitSet used = new BitSet();
    used.set(45);
    used.set(16);
    used.set(78);
    used.set(23);
    used.set(44);
    used.set(11);
    used.set(98);
    for(int i = 0;i<used.size();i++){
        if(used.get(i)) System.out.print(i+" ");
    }
}

查询

给40亿个不重复的unsigned int的整数,没有排过序,然后再给一个数,如果快速判断这个数是否在那40亿个数当中?

一个一个set进去,然后用get判断有没有。

(set完之后这个数据结构就可复用了)

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e6%b5%b7%e9%87%8f%e6%95%b0%e6%8d%ae%e7%ae%97%e6%b3%95-bitmap%e4%bb%8b%e7%bb%8d%e5%92%8c%e5%ae%9e%e7%8e%b0/