BitMap实现海量消息推送、已读记录的实现方案 - ZhangTory's NoteBlog - 张耀誉的笔记博客

BitMap实现海量消息推送、已读记录的实现方案

我们app有消息箱功能,可以接收系统发送的消息。
以前服务器推送的消息是缓存在用户手机中的,但是这种方法用户体验不好,所以现在需要将数据存放在服务器上。
系统会根据大数据的判断,给不同的用户发送不同的消息,所以需要记录用户是否有阅读该消息的权限,也就是推送记录。
同时app的用户很多,如果按照关系型数据库,每个用户创建一条记录,我们预计2个月就会生成1亿条数据,会给数据库带来很大的压力。
这种情况我们其实只需要记录是否推送、是否已读,用BitMap就能很好的节省空间,同时有很好的查询性能。

0. 什么是BitMap

简单的说我们用1个bit来代表true或false,然后你可以将若干个bit想象成一个数组,然后用下标寻址,读取对应的bit,理论上讲是O(1)的时间复杂度,空间复杂度为O(n)。
比如java中一个int是4个字节,也就是32个bit,所以一个int就是一个大小为32的数组。如果你的数据有50个,那么就需要2个int拼接起来,组成一个64大小的数组,以此类推。

我们假设一年365天,每天每个用户平均推送5条消息,那么1个用户1年有365*5=1825条消息,所以每个用户我们需要229个字节就能保存下来。
如果我们有50W用户,就需要114,500,000个字节,也就是不到110M的空间。推送和记录分别存储,所以一共是不到220M。从成本上来说能够接受。

1. BitMap的压缩

虽然BitMap已经非常节省空间了,但是还有进一步节省空间的方法,那就是数据压缩。
在BitMap中很可能出现连续的1或者0,我们可以通过对这些连续的值进行压缩,从而节省空间。
比如Google的EWAHCompressedBitmap使用了RLE(RunningLengthEncode)对数据进行压缩。
这项技术对于已读记录十分有效,因为大部分人都会强迫性的点掉未读,而对于不去看消息的人,则会有很多的未读。

2. BitMap的实现方案

第一种方法是JDK自带的BitSet,它能满足基本的使用,但是它没有实现数据压缩功能,同时需要考虑如何将BitMap中的数据落地。
第二种方法是使用Google的EWAHCompressedBitmap,它实现了数据压缩,但是也需要考虑数据落地的问题。
第三种方案是使用Redis,它没有实现数据压缩,但是Redis可以实现数据持久化,而且不存在序列化和反序列化过程,从性能上来说可能最快。

3. 需求分析,定义接口

本次内容我们需要实现标记用户是否有权限阅读消息、标记用户该消息是否已读、判断用户是否能阅读该消息、判断用户该消息是否已读,以及倒序分页查询用户可读消息列表。
之后我们不论用哪种方案,都需要实现这些功能。
需要实现的接口功能如下:

public interface UserMessageBitMap {

    /**
     * 用户推送消息记录
     * @param userId
     * @param messageId
     * @throws IOException
     */
    void pushRecord(Long userId, Integer messageId) throws IOException;

    /**
     * 判断该消息是否有权限读取(只有推送过才能读取)
     * @param userId
     * @param messageId
     * @return
     * @throws IOException
     */
    Boolean isReadable(Long userId, Integer messageId) throws IOException;

    /**
     * 用户已读消息记录
     * @param userId
     * @param messageId
     * @throws IOException
     */
    void readRecord(Long userId, Integer messageId) throws IOException;

    /**
     * 文章是否已读
     * @param userId
     * @param messageId
     * @return
     * @throws IOException
     */
    Boolean hasBeenRead(Long userId, Integer messageId) throws IOException;

    /**
     * 用户最近推送的消息查询
     * @param userId
     * @param size 每页大小
     * @param pagingParam 分页参数。根据不同实现方法确定.
     * @return
     * @throws IOException
     */
    List<Integer> getUserLastPushMessage(Long userId, Integer size, Integer... pagingParam) throws IOException;

}

4. EWAHCompressedBitmap的实现

JDK自带的BitSet没有优势所以就直接跳过了。
先考虑有数据压缩功能的EWAHCompressedBitmap。
我们使用MyBatisPlus辅助数据库操作,不过还是推荐考虑使用MongoDB等非关系型数据库进行存储。
所有的操作都需要序列号serializeBitMap和反序列化deserializeBigMap。
倒序分页查询用户可读消息列表,可以使用reverseIntIterator,比较方便。
分页时由于不能从中间某个位置开始查询,所以需要跳过一些id。
UserMessageRecordService是MyBatisPlus的通用service。

@Service
@Slf4j
public class CompressedUserMessageBitMap implements UserMessageBitMap {

    @Autowired
    private UserMessageRecordService userMessageService;

    /**
     * 用户推送消息记录
     *
     * @param userId
     * @param messageId
     * @throws IOException
     */
    @Override
    public void pushRecord(Long userId, Integer messageId) throws IOException {
        UserMessageRecord userRecord = userMessageService.getById(userId);
        EWAHCompressedBitmap pushBitmap = EWAHCompressedBitmap.bitmapOf(messageId);
        if (userRecord == null) {
            userRecord = new UserMessageRecord();
            userRecord.setUserId(userId);
        } else if (userRecord.getPushRecord() != null) {
            EWAHCompressedBitmap recordBitmap = deserializeBigMap(userRecord.getPushRecord());
            pushBitmap = recordBitmap.or(pushBitmap);
        }
        userRecord.setPushRecord(serializeBitMap(pushBitmap));
        userMessageService.saveOrUpdate(userRecord);
    }

    /**
     * 判断该消息是否有权限读取(只有推送过才能读取)
     *
     * @param userId
     * @param messageId
     * @return
     * @throws IOException
     */
    @Override
    public Boolean isReadable(Long userId, Integer messageId) throws IOException {
        UserMessageRecord userRecord = userMessageService.getById(userId);
        if (userRecord == null || userRecord.getPushRecord() == null) {
            return false;
        }
        EWAHCompressedBitmap messageBitMap = deserializeBigMap(userRecord.getPushRecord());
        return messageBitMap.get(messageId);
    }

    /**
     * 用户已读消息记录
     *
     * @param userId
     * @param messageId
     * @throws IOException
     */
    @Override
    public void readRecord(Long userId, Integer messageId) throws IOException {
        UserMessageRecord userRecord = userMessageService.getById(userId);
        EWAHCompressedBitmap readBitmap = EWAHCompressedBitmap.bitmapOf(messageId);
        if (userRecord == null) {
            userRecord = new UserMessageRecord();
            userRecord.setUserId(userId);
        } else if (userRecord.getReadRecord() != null) {
            EWAHCompressedBitmap recordBitmap = deserializeBigMap(userRecord.getReadRecord());
            readBitmap = recordBitmap.or(readBitmap);
        }
        userRecord.setReadRecord(serializeBitMap(readBitmap));
        userMessageService.saveOrUpdate(userRecord);
    }

    /**
     * 文章是否已读
     *
     * @param userId
     * @param messageId
     * @return
     * @throws IOException
     */
    @Override
    public Boolean hasBeenRead(Long userId, Integer messageId) throws IOException {
        UserMessageRecord userRecord = userMessageService.getById(userId);
        if (userRecord == null || userRecord.getReadRecord() == null) {
            return false;
        }
        EWAHCompressedBitmap messageBitMap = deserializeBigMap(userRecord.getReadRecord());
        return messageBitMap.get(messageId);
    }

    /**
     * 用户最近推送的消息查询
     *
     * @param userId
     * @param size        每页大小
     * @param pagingParam pageNo 查询页数
     * @return
     * @throws IOException
     */
    @Override
    public List<Integer> getUserLastPushMessage(Long userId, Integer size, Integer... pagingParam) throws IOException {
        List<Integer> messageList = new ArrayList<>();
        UserMessageRecord userRecord = userMessageService.getById(userId);
        if (userRecord == null || userRecord.getPushRecord() == null) {
            return messageList;
        }
        // 提取查询的页数
        Integer pageNo = Integer.valueOf(pagingParam[0]);
        if (pageNo <= 0) {
            return messageList;
        }
        EWAHCompressedBitmap messageBitMap = deserializeBigMap(userRecord.getPushRecord());
        IntIterator reverseIntIterator = messageBitMap.reverseIntIterator();
        Integer jump = (pageNo - 1) * size;
        while (reverseIntIterator.hasNext() && jump-- > 0) {
            reverseIntIterator.next();
        }
        while (reverseIntIterator.hasNext() && size-- > 0) {
            messageList.add(reverseIntIterator.next());
        }
        return messageList;
    }

    /**
     * 反序列化bitmap
     * @param bytes
     * @return
     * @throws IOException
     */
    private EWAHCompressedBitmap deserializeBigMap(byte[] bytes) throws IOException {
        EWAHCompressedBitmap bitmap = new EWAHCompressedBitmap();
        bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
        return bitmap;
    }

    /**
     * 序列化bitmap
     * @param bitmap
     * @return
     * @throws IOException
     */
    private byte[] serializeBitMap(EWAHCompressedBitmap bitmap) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        bitmap.serialize(new DataOutputStream(bos));
        return bos.toByteArray();
    }

}

5. redis的实现

根据团队的技术积累和可见的未来,我们最终使用了redis的方案。
每个用户一个pushRedisKey和readRedisKey,前4个方法实现非常简单,直接setBit和getBit即可。
在后续实际的编码中,我们对redisKey按月进行了拆分,这样有助于我们按月统计消息,以及灵活的删除不再需要的历史消息。
甚至我们可以根据redisKey记录的消息,做一个年终总结2333。。。
唯一要说的是倒序分页查询的方法,这里没有找到合适的方法,所以需要传入messageId,然后由此往前查询,判断是否推送,满足分页数量或messageId到0为止。
直接贴代码:

@Service
@Slf4j
public class RedisUserMessageBitMap implements UserMessageBitMap {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 消息推送记录key
     */
    private static final String MESSAGE_PUSH_RECORD_KEY = "message:push:${userId}";

    /**
     * 消息已读记录key
     */
    private static final String MESSAGE_READ_RECORD_KEY = "message:read:${userId}";

    /**
     * 用户推送消息记录
     *
     * @param userId
     * @param messageId
     * @throws IOException
     */
    @Override
    public void pushRecord(Long userId, Integer messageId) throws IOException {
        String redisKey = MESSAGE_PUSH_RECORD_KEY.replace("${userId}", userId.toString());
        redisTemplate.opsForValue().setBit(redisKey, messageId, true);
    }

    /**
     * 判断该消息是否有权限读取(只有推送过才能读取)
     *
     * @param userId
     * @param messageId
     * @return
     * @throws IOException
     */
    @Override
    public Boolean isReadable(Long userId, Integer messageId) throws IOException {
        String redisKey = MESSAGE_PUSH_RECORD_KEY.replace("${userId}", userId.toString());
        return redisTemplate.opsForValue().getBit(redisKey, messageId);
    }

    /**
     * 用户已读消息记录
     *
     * @param userId
     * @param messageId
     * @throws IOException
     */
    @Override
    public void readRecord(Long userId, Integer messageId) throws IOException {
        String redisKey = MESSAGE_READ_RECORD_KEY.replace("${userId}", userId.toString());
        redisTemplate.opsForValue().setBit(redisKey, messageId, true);
    }

    /**
     * 文章是否已读
     *
     * @param userId
     * @param messageId
     * @return
     * @throws IOException
     */
    @Override
    public Boolean hasBeenRead(Long userId, Integer messageId) throws IOException {
        String redisKey = MESSAGE_READ_RECORD_KEY.replace("${userId}", userId.toString());
        return redisTemplate.opsForValue().getBit(redisKey, messageId);
    }

    /**
     * 用户最近推送的消息查询
     *
     * @param userId
     * @param size        每页大小
     * @param pagingParam maxMessageId 最大消息id
     * @return
     * @throws IOException
     */
    @Override
    public List<Integer> getUserLastPushMessage(Long userId, Integer size, Integer... pagingParam) throws IOException {
        String redisKey = MESSAGE_PUSH_RECORD_KEY.replace("${userId}", userId.toString());
        List<Integer> list = new ArrayList<>();
        // 提取最大消息id
        Integer startMessageId = Integer.valueOf(pagingParam[0]);
        while (size > 0 && startMessageId > 0) {
            if (redisTemplate.opsForValue().getBit(redisKey, startMessageId)) {
                list.add(startMessageId);
                size--;
            }
            startMessageId--;
        }
        return list;
    }

}

6. 单元测试

再加上数据库相关的代码,最后整个工程的结构如下:

提供UserMessageBitMap接口,完成了RedisUserMessageBitMap和CompressedUserMessageBitMap两种实现。
最后跑一下测试代码,确保满足我们的预期。
EWAHCompressedBitmap实现:

@SpringBootTest
@Slf4j
class CompressedUserMessageBitMapTest {

    @Autowired
    private CompressedUserMessageBitMap userMessageBitMap;

    private Long userId = 1L;

    @Test
    void pushMessageAndCheckReadable() throws IOException {
        userMessageBitMap.pushRecord(userId, 2);
        userMessageBitMap.pushRecord(userId, 3);
        userMessageBitMap.pushRecord(userId, 4);
        userMessageBitMap.pushRecord(userId, 6);
        userMessageBitMap.pushRecord(userId, 8);
        userMessageBitMap.pushRecord(userId, 10);
        userMessageBitMap.pushRecord(userId, 11);
        Boolean readableOne = userMessageBitMap.isReadable(userId, 1);
        Assertions.assertFalse(readableOne);
        Boolean readableTwo = userMessageBitMap.isReadable(userId, 2);
        Assertions.assertTrue(readableTwo);

        Boolean readableNull = userMessageBitMap.isReadable(2L, 1);
        Assertions.assertFalse(readableNull);
    }

    @Test
    void readMessageAndCheckHasBeenRead() throws IOException {
        userMessageBitMap.readRecord(userId, 8);
        Boolean readEight = userMessageBitMap.hasBeenRead(userId, 8);
        Assertions.assertTrue(readEight);
        Boolean readTen = userMessageBitMap.hasBeenRead(userId, 10);
        Assertions.assertFalse(readTen);


        Boolean readNull = userMessageBitMap.hasBeenRead(2L, 1);
        Assertions.assertFalse(readNull);
        userMessageBitMap.readRecord(2L, 1);
        Boolean readNotNull = userMessageBitMap.hasBeenRead(2L, 1);
        Assertions.assertTrue(readNotNull);
    }

    @Test
    void getLastMessageList() throws IOException {
        Integer pageNo = 1;
        List<Integer> messageList1 = userMessageBitMap.getUserLastPushMessage(userId, 3, pageNo);
        Assertions.assertArrayEquals(new Integer[]{11, 10, 8}, messageList1.toArray());
        pageNo++;
        List<Integer> messageList2 = userMessageBitMap.getUserLastPushMessage(userId, 3, pageNo);
        Assertions.assertArrayEquals(new Integer[]{6, 4, 3}, messageList2.toArray());
        pageNo++;
        List<Integer> messageList3 = userMessageBitMap.getUserLastPushMessage(userId, 3, pageNo);
        Assertions.assertArrayEquals(new Integer[]{2}, messageList3.toArray());

        List<Integer> messageList0 = userMessageBitMap.getUserLastPushMessage(userId, 3, 0);
        Assertions.assertEquals(0, messageList0.size());
    }

}

Redis实现:

@SpringBootTest
@Slf4j
class RedisUserMessageBitMapTest {

    @Autowired
    private RedisUserMessageBitMap userMessageBitMap;

    private Long userId = 1L;

    @Test
    void pushMessageAndCheckReadable() throws IOException {
        userMessageBitMap.pushRecord(userId, 2);
        userMessageBitMap.pushRecord(userId, 3);
        userMessageBitMap.pushRecord(userId, 4);
        userMessageBitMap.pushRecord(userId, 6);
        userMessageBitMap.pushRecord(userId, 8);
        userMessageBitMap.pushRecord(userId, 10);
        userMessageBitMap.pushRecord(userId, 11);
        Boolean readableOne = userMessageBitMap.isReadable(userId, 1);
        Assertions.assertFalse(readableOne);
        Boolean readableTwo = userMessageBitMap.isReadable(userId, 2);
        Assertions.assertTrue(readableTwo);
    }

    @Test
    void readMessageAndCheckHasBeenRead() throws IOException {
        userMessageBitMap.readRecord(userId, 8);
        Boolean readEight = userMessageBitMap.hasBeenRead(userId, 8);
        Assertions.assertTrue(readEight);
        Boolean readTen = userMessageBitMap.hasBeenRead(userId, 10);
        Assertions.assertFalse(readTen);
    }

    @Test
    void getLastMessageList() throws IOException {
        Integer maxMessageId = 11;
        List<Integer> messageList1 = userMessageBitMap.getUserLastPushMessage(userId, 3, maxMessageId);
        Assertions.assertArrayEquals(new Integer[]{11, 10, 8}, messageList1.toArray());
        for (Integer messageId : messageList1) {
            maxMessageId = Math.min(maxMessageId, messageId);
        }
        List<Integer> messageList2 = userMessageBitMap.getUserLastPushMessage(userId, 3, maxMessageId - 1);
        Assertions.assertArrayEquals(new Integer[]{6, 4, 3}, messageList2.toArray());
        for (Integer messageId : messageList2) {
            maxMessageId = Math.min(maxMessageId, messageId);
        }
        List<Integer> messageList3 = userMessageBitMap.getUserLastPushMessage(userId, 3, maxMessageId - 1);
        Assertions.assertArrayEquals(new Integer[]{2}, messageList3.toArray());
    }

}

调整下执行顺序,最终能实现100%的测试覆盖。

添加新评论

电子邮件地址不会被公开,评论内容可能需要管理员审核后显示。