Springboot项目redisTemplate实现轻量级消息队列

老王Plus 2019-04-09 13:52:16 ⋅ 612 阅读

作者:wangzaiplus

地址:https://www.cnblogs.com/wangzaiplus/p/10660520.html

背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发。

一、本文涉及知识点

  • excel文件读写--阿里easyexcel sdk

  • 文件上传、下载--腾讯云对象存储

  • 远程服务调用--restTemplate

  • 生产者、消费者--redisTemplate leftPush和rightPop操作

  • 异步处理数据--Executors线程池

  • 读取网络文件流--HttpClient

  • 自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, Java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

二、项目目录结构

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

  • easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>${easyexcel-latestVersion}</version>
</dependency>
  • JWT

        <dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.7.0</version>
</dependency>
  • redis

        <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
  • 腾讯cos

        <dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>5.4.5</version>
</dependency>

四、流程

  • 用户上传文件

  • 将文件存储到腾讯cos

  • 将上传后的文件id及上传记录保存到数据库

  • redis生产一条导入消息, 即保存文件id到redis

  • 请求结束, 返回"处理中"状态

  • redis消费消息

  • 读取cos文件, 异步处理数据

  • 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"

  • 客户端轮询查询处理状态, 并可以下载错误文件

  • 结束

五、实现效果

  • 上传文件

  • 数据库导入记录

  • 导入的数据

  • 下载错误文件

  • 错误数据提示

  • 查询导入记录

六、代码实现

1、导入excel控制层

    @LoginRequired
@RequestMapping(value = "doImport", method = RequestMethod.POST)
public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
PLUser user = getUser(request);
return orderImportService.doImport(file, user.getId());
}

2、service层

    @Override
public JsonResponse doImport(MultipartFile file, Integer userId) {
if (null == file || file.isEmpty()) {
throw new ServiceException("文件不能为空");
}

String filename = file.getOriginalFilename();
if (!checkFileSuffix(filename)) {
throw new ServiceException("当前仅支持xlsx格式的excel");
}

// 存储文件
String fileId = saveToOss(file);
if (StringUtils.isBlank(fileId)) {
throw new ServiceException("文件上传失败, 请稍后重试");
}

// 保存记录到数据库
saveRecordToDB(userId, fileId, filename);

// 生产一条订单导入消息
redisProducer.produce(RedisKey.orderImportKey, fileId);

return JsonResponse.ok("导入成功, 处理中...");
}

/**
* 校验文件格式
* @param fileName
* @return
*/

private static boolean checkFileSuffix(String fileName) {
if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
return false;
}

int pointIndex = fileName.lastIndexOf(".");
String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
if (".xlsx".equals(suffix)) {
return true;
}

return false;
}

/**
* 将文件存储到腾讯OSS
* @param file
* @return
*/

private String saveToOss(MultipartFile file) {
InputStream ins = null;
try {
ins = file.getInputStream();
} catch (IOException e) {
e.printStackTrace();
}

String fileId;
try {
String originalFilename = file.getOriginalFilename();
File f = new File(originalFilename);
inputStreamToFile(ins, f);
FileSystemResource resource = new FileSystemResource(f);

MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("file", resource);

ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
fileId = (String) responseResult.getData();
} catch (Exception e) {
fileId = null;
}

return fileId;
}

3、redis生产者

@Service
public class RedisProducerImpl implements RedisProducer {

@Autowired
private RedisTemplate redisTemplate;

@Override
public JsonResponse produce(String key, String msg) {
Map<String, String> map = Maps.newHashMap();
map.put("fileId", msg);
redisTemplate.opsForList().leftPush(key, map);
return JsonResponse.ok();
}

}

4、redis消费者

@Service
public class RedisConsumer {

@Autowired
public RedisTemplate redisTemplate;

@Value("${txOssFileUrl}")
private String txOssFileUrl;

@Value("${txOssUploadUrl}")
private String txOssUploadUrl;

@PostConstruct
public void init() {
processOrderImport();
}

/**
* 处理订单导入
*/

private void processOrderImport() {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
while (true) {
Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
if (null == object) {
continue;
}
String msg = JSON.toJSONString(object);
executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
}
});
}

}

5、处理任务线程类

public class OrderImportTask implements Runnable {
public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
this.msg = msg;
this.txOssFileUrl = txOssFileUrl;
this.txOssUploadUrl = txOssUploadUrl;
}
}

/**
* 注入bean
*/

private void autowireBean() {
this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
}

@Override
public void run() {
// 注入bean
autowireBean();

JSONObject jsonObject = JSON.parseObject(msg);
String fileId = jsonObject.getString("fileId");

MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("id", fileId);

ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
String fileUrl = (String) responseResult.getData();
if (StringUtils.isBlank(fileUrl)) {
return;
}

InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
List<Object> list = ExcelUtil.read(inputStream);
process(list, fileId);
}

/**
* 将文件上传至oss
* @param file
* @return
*/

private String saveToOss(File file) {
String fileId;
try {
FileSystemResource resource = new FileSystemResource(file);
MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("file", resource);

ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
fileId = (String) responseResult.getData();
} catch (Exception e) {
fileId = null;
}
return fileId;
}

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

    @RequestMapping("/txOssUpload")
@ResponseBody
public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
if (null == file || file.isEmpty()) {
return ResponseResult.fail("文件不能为空");
}

String originalFilename = file.getOriginalFilename();
originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题
String contentType = getContentType(originalFilename);
String key;

InputStream ins = null;
File f = null;

try {
ins = file.getInputStream();
f = new File(originalFilename);
inputStreamToFile(ins, f);
key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
} catch (Exception e) {
return ResponseResult.fail(e.getMessage());
} finally {
if (null != ins) {
try {
ins.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (f.exists()) {// 删除临时文件
f.delete();
}
}

return ResponseResult.ok(key);
}

public static void inputStreamToFile(InputStream ins,File file) {
try {
OutputStream os = new FileOutputStream(file);
int bytesRead = 0;
byte[] buffer = new byte[8192];
while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
os.write(buffer, 0, bytesRead);
}
os.close();
ins.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
key = Uuid.getUuid() + "-" + key;
OSSUtil.txOssUpload(inputStream, key, contentType);
try {
if (null != inputStream) {
inputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return key;
}

public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
ObjectMetadata objectMetadata = new ObjectMetadata();
try{
int length = inputStream.available();
objectMetadata.setContentLength(length);
}catch (Exception e){
logger.info(e.getMessage());
}
objectMetadata.setContentType(contentType);
cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
}

7、下载文件

    /**
* 腾讯云文件下载
* @param response
* @param id
* @return
*/
@RequestMapping("/txOssDownload")
public Object txOssDownload(HttpServletResponse response, String id) {
COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
String contentType = getContentType(id);
FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
return null;
}

public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
FileOutputStream fos = null;
response.reset();
OutputStream os = null;
try {
response.setContentType(contentType + "; charset=utf-8");
if(!contentType.equals(PlConstans.FileContentType.image)){
try {
response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
} catch (UnsupportedEncodingException e) {
response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
logger.error("encoding file name failed", e);
}
}

os = response.getOutputStream();

byte[] b = new byte[1024 * 1024];
int len;
while ((len = fileStream.read(b)) > 0) {
os.write(b, 0, len);
os.flush();
try {
if(fos != null) {
fos.write(b, 0, len);
fos.flush();
}
} catch (Exception e) {
logger.error(e.getMessage());
}
}
} catch (IOException e) {
IOUtils.closeQuietly(fos);
fos = null;
} finally {
IOUtils.closeQuietly(os);
IOUtils.closeQuietly(fileStream);
if(fos != null) {
IOUtils.closeQuietly(fos);
}
}
}

8、读取网络文件流

    /**
* 读取网络文件流
* @param url
* @return
*/

public static InputStream readFileFromURL(String url) {
if (StringUtils.isBlank(url)) {
return null;
}

HttpClient httpClient = new DefaultHttpClient();
HttpGet methodGet = new HttpGet(url);
try {
HttpResponse response = httpClient.execute(methodGet);
if (response.getStatusLine().getStatusCode() == 200) {
HttpEntity entity = response.getEntity();
return entity.getContent();
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

9、ExcelUtil

    /**
* 读excel
* @param inputStream 文件输入流
* @return list集合
*/

public static List<Object> read(InputStream inputStream) {
return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
}

/**
* 写excel
* @param data list数据
* @param clazz
* @param saveFilePath 文件保存路径
* @throws IOException
*/

public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
File tempFile = new File(saveFilePath);
OutputStream out = new FileOutputStream(tempFile);
ExcelWriter writer = EasyExcelFactory.getWriter(out);
Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
writer.write(data, sheet);
writer.finish();
out.close();
}

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@LoginRequired注解

/**
* 在需要登录验证的Controller的方法上使用此注解
*/

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

@ResponseBody
@ExceptionHandler(TokenValidationException.class)
public JsonResponse tokenValidationExceptionHandler() {
return JsonResponse.loginInvalid();
}

@ResponseBody
@ExceptionHandler(ServiceException.class)
public JsonResponse serviceExceptionHandler(ServiceException se) {
return JsonResponse.fail(se.getMsg());
}

@ResponseBody
@ExceptionHandler(Exception.class)
public JsonResponse exceptionHandler(Exception e) {
e.printStackTrace();
return JsonResponse.fail(e.getMessage());
}

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

private static final String CURRENT_USER = "user";

@Autowired
private UserService userService;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
// 如果不是映射到方法直接通过
if (!(handler instanceof HandlerMethod)) {
return true;
}
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();

// 判断接口是否有@LoginRequired注解, 有则需要登录
LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
if (methodAnnotation != null) {
// 验证token
Integer userId = JwtUtil.verifyToken(request);
PLUser plUser = userService.selectByPrimaryKey(userId);
if (null == plUser) {
throw new RuntimeException("用户不存在,请重新登录");
}
request.setAttribute(CURRENT_USER, plUser);
return true;
}
return true;
}

@Override
public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
}

@Override
public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
}
}

4、JwtUtil

    public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
public static final String SECRET = "pl_token_secret";
public static final String HEADER = "token";
public static final String USER_ID = "userId";

/**
* 根据userId生成token
* @param userId
* @return
*/

public static String generateToken(String userId) {
HashMap<String, Object> map = new HashMap<>();
map.put(USER_ID, userId);
String jwt = Jwts.builder()
.setClaims(map)
.setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
.signWith(SignatureAlgorithm.HS512, SECRET)
.compact();
return jwt;
}

/**
* 验证token
* @param request
* @return 验证通过返回userId
*/

public static Integer verifyToken(HttpServletRequest request) {
String token = request.getHeader(HEADER);
if (token != null) {
try {
Map<String, Object> body = Jwts.parser()
.setSigningKey(SECRET)
.parseClaimsJws(token)
.getBody();

for (Map.Entry entry : body.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
if (key.toString().equals(USER_ID)) {
return Integer.valueOf(value.toString());// userId
}
}
return null;
} catch (Exception e) {
logger.error(e.getMessage());
throw new TokenValidationException("unauthorized");
}
} else {
throw new TokenValidationException("missing token");
}
}


---------------END----------------

后续的内容同样精彩

长按关注“IT实战联盟”哦






全部评论: 0

    我有话说:

    消息队列常见问题(二):消息队列产生大量的消息堆积怎么解决?

    上一节列举了生产上消息队列产生大量的消息堆积会有哪些后果,那相对应的解决方法有哪些呢?1、消息被丢弃情况如果要实现防止消息过期问题,最好不要设置过期时间!那设置了过期时间导致消息丢失怎么补救呢?答案

    腾讯自研高吞吐消息队列组件TubeMQ升级为 TubeHub

    TubeMQ简介 TubeMQ 项目始于 2013 年,是腾讯自研的高吞吐消息队列组件。项目团队于 2019 年将 TubeMQ 捐赠给 Apache 基金会,成为腾讯首个被 Apache 基金会

    消息队列常见问题(一):生产上消息队列产生大量的消息堆积会有什么后果?

    大多数消息堆积原因是Consumer出现了问题,并且没有被运维/开发监控到即使修复问题,导致大量的消息都积压在 MQ 中,那么会造成哪些后果呢?1、消息被丢弃例如 RabbitMQ 中的一条消息设置

    SpringBoot+zk+dubbo架构实践(一):本地部署zookeeper

    SpringBoot+zk+dubbo架构实践系列实现目标:自己动手搭建微服务架构

    「尝鲜」SpringBoot 快速整合Swagger 3.0

    第一步:Maven引入Swagger3.0 starter依赖 Maven项目中引入springfox-boot-starter依赖: <dependency> <

    架构实战篇(九):Spring Boot 集成 RocketMQ

    快速集成阿里开源消息队列 RocketMQ

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了

    SpringBoot+zk+dubbo架构实践(二):SpringBoot 集成 zookeeper

    不啰嗦,本篇完成两件事:1、搭建SpringBoot 框架;2、基于spring boot框架访问zookeeper。

    「转载」蘑菇街消息系统上云实践

    小编又来啦~本周要推荐给大家的是一篇跟中间件上云相关的技术文章,这里面详细的记录了,蘑菇街自研消息系统上云的全过程,也是市面上开放出来为数不多的企业自研组件上云实践。有相关需求的同学可以好好学习下

    分享一个标星42.4k 的商城管理后台项目模板

    项目简介 mall项目是一套电商系统,包括前台商城系统及后台管理系统,基于SpringBoot+MyBatis实现,采用Docker容器化部署。 前台商城系统包含首页门户、商品推荐、商品搜索、商品

    Node实战篇:阶段项目(九)

    项目整体预览 项目的github地址 界面逼格还行-_- 主要功能: 登陆; 退出; 所用的主要模块: express, 路由.静态文件.模块分工等; express-session, 采用

    老板说:明天来加班写个FCM消息推送功能......

    基于Spring Boot集成Firebase实现FCM消息推送功能

    【开源资讯】Spring Boot 2.4.0.M4 发布

    Spring Boot 2.4.0 的第四个里程碑版本发布了,可以从里程碑仓库获取。此版本包含 145 项更新内容,亮点如下:1、改进故障分析器(Failure Analyzer)2、能够发布由 Maven 和 Gradle ...

    【超全教程】SpringBoot 2.3.x 分层构建 Docker 镜像实践

    作者:超级小豆丁 http://www.mydlq.club/article/98/ 目录 什么是镜像分层 SpringBoot 2.3.x 新增对分层的支持 创建测试的 SpringBoot

    微服务架构实战篇:快速入手SpringBoot 2.0,欢迎入坑哦~~~

    SpringBoot 2.0 基本要求Java最低要求8以上,不再支持Java 6 和 7等低版本。

    架构实战篇(十六):Spring Boot Assembly服务化打包

    使用assembly来打包springboot微服务项目,让发布更简单