我们app有消息箱功能,可以接收系统发送的消息。
以前服务器推送的消息是缓存在用户手机中的,但是这种方法用户体验不好,所以现在需要将数据存放在服务器上。
系统会根据大数据的判断,给不同的用户发送不同的消息,所以需要记录用户是否有阅读该消息的权限,也就是推送记录。
同时app的用户很多,如果按照关系型数据库,每个用户创建一条记录,我们预计2个月就会生成1亿条数据,会给数据库带来很大的压力。
这种情况我们其实只需要记录是否推送、是否已读,用BitMap就能很好的节省空间,同时有很好的查询性能。
0. 什么是BitMap
简单的说我们用1个bit来代表true或false,然后你可以将若干个bit想象成一个数组,然后用下标寻址,读取对应的bit,理论上讲是O(1)的时间复杂度,空间复杂度为O(n)。
比如java中一个int是4个字节,也就是32个bit,所以一个int就是一个大小为32的数组。如果你的数据有50个,那么就需要2个int拼接起来,组成一个64大小的数组,以此类推。
我们假设一年365天,每天每个用户平均推送5条消息,那么1个用户1年有365*5=1825条消息,所以每个用户我们需要229个字节就能保存下来。
如果我们有50W用户,就需要114,500,000个字节,也就是不到110M的空间。推送和记录分别存储,所以一共是不到220M。从成本上来说能够接受。
1. BitMap的压缩
虽然BitMap已经非常节省空间了,但是还有进一步节省空间的方法,那就是数据压缩。
在BitMap中很可能出现连续的1或者0,我们可以通过对这些连续的值进行压缩,从而节省空间。
比如Google的EWAHCompressedBitmap使用了RLE(RunningLengthEncode)对数据进行压缩。
这项技术对于已读记录十分有效,因为大部分人都会强迫性的点掉未读,而对于不去看消息的人,则会有很多的未读。
2. BitMap的实现方案
第一种方法是JDK自带的BitSet,它能满足基本的使用,但是它没有实现数据压缩功能,同时需要考虑如何将BitMap中的数据落地。
第二种方法是使用Google的EWAHCompressedBitmap,它实现了数据压缩,但是也需要考虑数据落地的问题。
第三种方案是使用Redis,它没有实现数据压缩,但是Redis可以实现数据持久化,而且不存在序列化和反序列化过程,从性能上来说可能最快。
3. 需求分析,定义接口
本次内容我们需要实现标记用户是否有权限阅读消息、标记用户该消息是否已读、判断用户是否能阅读该消息、判断用户该消息是否已读,以及倒序分页查询用户可读消息列表。
之后我们不论用哪种方案,都需要实现这些功能。
需要实现的接口功能如下:
public interface UserMessageBitMap {
/**
* 用户推送消息记录
* @param userId
* @param messageId
* @throws IOException
*/
void pushRecord(Long userId, Integer messageId) throws IOException;
/**
* 判断该消息是否有权限读取(只有推送过才能读取)
* @param userId
* @param messageId
* @return
* @throws IOException
*/
Boolean isReadable(Long userId, Integer messageId) throws IOException;
/**
* 用户已读消息记录
* @param userId
* @param messageId
* @throws IOException
*/
void readRecord(Long userId, Integer messageId) throws IOException;
/**
* 文章是否已读
* @param userId
* @param messageId
* @return
* @throws IOException
*/
Boolean hasBeenRead(Long userId, Integer messageId) throws IOException;
/**
* 用户最近推送的消息查询
* @param userId
* @param size 每页大小
* @param pagingParam 分页参数。根据不同实现方法确定.
* @return
* @throws IOException
*/
List<Integer> getUserLastPushMessage(Long userId, Integer size, Integer... pagingParam) throws IOException;
}
4. EWAHCompressedBitmap的实现
JDK自带的BitSet没有优势所以就直接跳过了。
先考虑有数据压缩功能的EWAHCompressedBitmap。
我们使用MyBatisPlus辅助数据库操作,不过还是推荐考虑使用MongoDB等非关系型数据库进行存储。
所有的操作都需要序列号serializeBitMap和反序列化deserializeBigMap。
倒序分页查询用户可读消息列表,可以使用reverseIntIterator,比较方便。
分页时由于不能从中间某个位置开始查询,所以需要跳过一些id。
UserMessageRecordService是MyBatisPlus的通用service。
@Service
@Slf4j
public class CompressedUserMessageBitMap implements UserMessageBitMap {
@Autowired
private UserMessageRecordService userMessageService;
/**
* 用户推送消息记录
*
* @param userId
* @param messageId
* @throws IOException
*/
@Override
public void pushRecord(Long userId, Integer messageId) throws IOException {
UserMessageRecord userRecord = userMessageService.getById(userId);
EWAHCompressedBitmap pushBitmap = EWAHCompressedBitmap.bitmapOf(messageId);
if (userRecord == null) {
userRecord = new UserMessageRecord();
userRecord.setUserId(userId);
} else if (userRecord.getPushRecord() != null) {
EWAHCompressedBitmap recordBitmap = deserializeBigMap(userRecord.getPushRecord());
pushBitmap = recordBitmap.or(pushBitmap);
}
userRecord.setPushRecord(serializeBitMap(pushBitmap));
userMessageService.saveOrUpdate(userRecord);
}
/**
* 判断该消息是否有权限读取(只有推送过才能读取)
*
* @param userId
* @param messageId
* @return
* @throws IOException
*/
@Override
public Boolean isReadable(Long userId, Integer messageId) throws IOException {
UserMessageRecord userRecord = userMessageService.getById(userId);
if (userRecord == null || userRecord.getPushRecord() == null) {
return false;
}
EWAHCompressedBitmap messageBitMap = deserializeBigMap(userRecord.getPushRecord());
return messageBitMap.get(messageId);
}
/**
* 用户已读消息记录
*
* @param userId
* @param messageId
* @throws IOException
*/
@Override
public void readRecord(Long userId, Integer messageId) throws IOException {
UserMessageRecord userRecord = userMessageService.getById(userId);
EWAHCompressedBitmap readBitmap = EWAHCompressedBitmap.bitmapOf(messageId);
if (userRecord == null) {
userRecord = new UserMessageRecord();
userRecord.setUserId(userId);
} else if (userRecord.getReadRecord() != null) {
EWAHCompressedBitmap recordBitmap = deserializeBigMap(userRecord.getReadRecord());
readBitmap = recordBitmap.or(readBitmap);
}
userRecord.setReadRecord(serializeBitMap(readBitmap));
userMessageService.saveOrUpdate(userRecord);
}
/**
* 文章是否已读
*
* @param userId
* @param messageId
* @return
* @throws IOException
*/
@Override
public Boolean hasBeenRead(Long userId, Integer messageId) throws IOException {
UserMessageRecord userRecord = userMessageService.getById(userId);
if (userRecord == null || userRecord.getReadRecord() == null) {
return false;
}
EWAHCompressedBitmap messageBitMap = deserializeBigMap(userRecord.getReadRecord());
return messageBitMap.get(messageId);
}
/**
* 用户最近推送的消息查询
*
* @param userId
* @param size 每页大小
* @param pagingParam pageNo 查询页数
* @return
* @throws IOException
*/
@Override
public List<Integer> getUserLastPushMessage(Long userId, Integer size, Integer... pagingParam) throws IOException {
List<Integer> messageList = new ArrayList<>();
UserMessageRecord userRecord = userMessageService.getById(userId);
if (userRecord == null || userRecord.getPushRecord() == null) {
return messageList;
}
// 提取查询的页数
Integer pageNo = Integer.valueOf(pagingParam[0]);
if (pageNo <= 0) {
return messageList;
}
EWAHCompressedBitmap messageBitMap = deserializeBigMap(userRecord.getPushRecord());
IntIterator reverseIntIterator = messageBitMap.reverseIntIterator();
Integer jump = (pageNo - 1) * size;
while (reverseIntIterator.hasNext() && jump-- > 0) {
reverseIntIterator.next();
}
while (reverseIntIterator.hasNext() && size-- > 0) {
messageList.add(reverseIntIterator.next());
}
return messageList;
}
/**
* 反序列化bitmap
* @param bytes
* @return
* @throws IOException
*/
private EWAHCompressedBitmap deserializeBigMap(byte[] bytes) throws IOException {
EWAHCompressedBitmap bitmap = new EWAHCompressedBitmap();
bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
return bitmap;
}
/**
* 序列化bitmap
* @param bitmap
* @return
* @throws IOException
*/
private byte[] serializeBitMap(EWAHCompressedBitmap bitmap) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bitmap.serialize(new DataOutputStream(bos));
return bos.toByteArray();
}
}
5. redis的实现
根据团队的技术积累和可见的未来,我们最终使用了redis的方案。
每个用户一个pushRedisKey和readRedisKey,前4个方法实现非常简单,直接setBit和getBit即可。
在后续实际的编码中,我们对redisKey按月进行了拆分,这样有助于我们按月统计消息,以及灵活的删除不再需要的历史消息。
甚至我们可以根据redisKey记录的消息,做一个年终总结2333。。。
唯一要说的是倒序分页查询的方法,这里没有找到合适的方法,所以需要传入messageId,然后由此往前查询,判断是否推送,满足分页数量或messageId到0为止。
直接贴代码:
@Service
@Slf4j
public class RedisUserMessageBitMap implements UserMessageBitMap {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 消息推送记录key
*/
private static final String MESSAGE_PUSH_RECORD_KEY = "message:push:${userId}";
/**
* 消息已读记录key
*/
private static final String MESSAGE_READ_RECORD_KEY = "message:read:${userId}";
/**
* 用户推送消息记录
*
* @param userId
* @param messageId
* @throws IOException
*/
@Override
public void pushRecord(Long userId, Integer messageId) throws IOException {
String redisKey = MESSAGE_PUSH_RECORD_KEY.replace("${userId}", userId.toString());
redisTemplate.opsForValue().setBit(redisKey, messageId, true);
}
/**
* 判断该消息是否有权限读取(只有推送过才能读取)
*
* @param userId
* @param messageId
* @return
* @throws IOException
*/
@Override
public Boolean isReadable(Long userId, Integer messageId) throws IOException {
String redisKey = MESSAGE_PUSH_RECORD_KEY.replace("${userId}", userId.toString());
return redisTemplate.opsForValue().getBit(redisKey, messageId);
}
/**
* 用户已读消息记录
*
* @param userId
* @param messageId
* @throws IOException
*/
@Override
public void readRecord(Long userId, Integer messageId) throws IOException {
String redisKey = MESSAGE_READ_RECORD_KEY.replace("${userId}", userId.toString());
redisTemplate.opsForValue().setBit(redisKey, messageId, true);
}
/**
* 文章是否已读
*
* @param userId
* @param messageId
* @return
* @throws IOException
*/
@Override
public Boolean hasBeenRead(Long userId, Integer messageId) throws IOException {
String redisKey = MESSAGE_READ_RECORD_KEY.replace("${userId}", userId.toString());
return redisTemplate.opsForValue().getBit(redisKey, messageId);
}
/**
* 用户最近推送的消息查询
*
* @param userId
* @param size 每页大小
* @param pagingParam maxMessageId 最大消息id
* @return
* @throws IOException
*/
@Override
public List<Integer> getUserLastPushMessage(Long userId, Integer size, Integer... pagingParam) throws IOException {
String redisKey = MESSAGE_PUSH_RECORD_KEY.replace("${userId}", userId.toString());
List<Integer> list = new ArrayList<>();
// 提取最大消息id
Integer startMessageId = Integer.valueOf(pagingParam[0]);
while (size > 0 && startMessageId > 0) {
if (redisTemplate.opsForValue().getBit(redisKey, startMessageId)) {
list.add(startMessageId);
size--;
}
startMessageId--;
}
return list;
}
}
6. 单元测试
再加上数据库相关的代码,最后整个工程的结构如下:
提供UserMessageBitMap接口,完成了RedisUserMessageBitMap和CompressedUserMessageBitMap两种实现。
最后跑一下测试代码,确保满足我们的预期。
EWAHCompressedBitmap实现:
@SpringBootTest
@Slf4j
class CompressedUserMessageBitMapTest {
@Autowired
private CompressedUserMessageBitMap userMessageBitMap;
private Long userId = 1L;
@Test
void pushMessageAndCheckReadable() throws IOException {
userMessageBitMap.pushRecord(userId, 2);
userMessageBitMap.pushRecord(userId, 3);
userMessageBitMap.pushRecord(userId, 4);
userMessageBitMap.pushRecord(userId, 6);
userMessageBitMap.pushRecord(userId, 8);
userMessageBitMap.pushRecord(userId, 10);
userMessageBitMap.pushRecord(userId, 11);
Boolean readableOne = userMessageBitMap.isReadable(userId, 1);
Assertions.assertFalse(readableOne);
Boolean readableTwo = userMessageBitMap.isReadable(userId, 2);
Assertions.assertTrue(readableTwo);
Boolean readableNull = userMessageBitMap.isReadable(2L, 1);
Assertions.assertFalse(readableNull);
}
@Test
void readMessageAndCheckHasBeenRead() throws IOException {
userMessageBitMap.readRecord(userId, 8);
Boolean readEight = userMessageBitMap.hasBeenRead(userId, 8);
Assertions.assertTrue(readEight);
Boolean readTen = userMessageBitMap.hasBeenRead(userId, 10);
Assertions.assertFalse(readTen);
Boolean readNull = userMessageBitMap.hasBeenRead(2L, 1);
Assertions.assertFalse(readNull);
userMessageBitMap.readRecord(2L, 1);
Boolean readNotNull = userMessageBitMap.hasBeenRead(2L, 1);
Assertions.assertTrue(readNotNull);
}
@Test
void getLastMessageList() throws IOException {
Integer pageNo = 1;
List<Integer> messageList1 = userMessageBitMap.getUserLastPushMessage(userId, 3, pageNo);
Assertions.assertArrayEquals(new Integer[]{11, 10, 8}, messageList1.toArray());
pageNo++;
List<Integer> messageList2 = userMessageBitMap.getUserLastPushMessage(userId, 3, pageNo);
Assertions.assertArrayEquals(new Integer[]{6, 4, 3}, messageList2.toArray());
pageNo++;
List<Integer> messageList3 = userMessageBitMap.getUserLastPushMessage(userId, 3, pageNo);
Assertions.assertArrayEquals(new Integer[]{2}, messageList3.toArray());
List<Integer> messageList0 = userMessageBitMap.getUserLastPushMessage(userId, 3, 0);
Assertions.assertEquals(0, messageList0.size());
}
}
Redis实现:
@SpringBootTest
@Slf4j
class RedisUserMessageBitMapTest {
@Autowired
private RedisUserMessageBitMap userMessageBitMap;
private Long userId = 1L;
@Test
void pushMessageAndCheckReadable() throws IOException {
userMessageBitMap.pushRecord(userId, 2);
userMessageBitMap.pushRecord(userId, 3);
userMessageBitMap.pushRecord(userId, 4);
userMessageBitMap.pushRecord(userId, 6);
userMessageBitMap.pushRecord(userId, 8);
userMessageBitMap.pushRecord(userId, 10);
userMessageBitMap.pushRecord(userId, 11);
Boolean readableOne = userMessageBitMap.isReadable(userId, 1);
Assertions.assertFalse(readableOne);
Boolean readableTwo = userMessageBitMap.isReadable(userId, 2);
Assertions.assertTrue(readableTwo);
}
@Test
void readMessageAndCheckHasBeenRead() throws IOException {
userMessageBitMap.readRecord(userId, 8);
Boolean readEight = userMessageBitMap.hasBeenRead(userId, 8);
Assertions.assertTrue(readEight);
Boolean readTen = userMessageBitMap.hasBeenRead(userId, 10);
Assertions.assertFalse(readTen);
}
@Test
void getLastMessageList() throws IOException {
Integer maxMessageId = 11;
List<Integer> messageList1 = userMessageBitMap.getUserLastPushMessage(userId, 3, maxMessageId);
Assertions.assertArrayEquals(new Integer[]{11, 10, 8}, messageList1.toArray());
for (Integer messageId : messageList1) {
maxMessageId = Math.min(maxMessageId, messageId);
}
List<Integer> messageList2 = userMessageBitMap.getUserLastPushMessage(userId, 3, maxMessageId - 1);
Assertions.assertArrayEquals(new Integer[]{6, 4, 3}, messageList2.toArray());
for (Integer messageId : messageList2) {
maxMessageId = Math.min(maxMessageId, messageId);
}
List<Integer> messageList3 = userMessageBitMap.getUserLastPushMessage(userId, 3, maxMessageId - 1);
Assertions.assertArrayEquals(new Integer[]{2}, messageList3.toArray());
}
}
调整下执行顺序,最终能实现100%的测试覆盖。