黑马头条项目详解(三)

黑马头条项目详解(三)定时任务 有固定周期的 有明确的触发时间延迟任务 没有固定的开始时间 它常常是由一个事件触发的 而在这个事件触发之后的一段时间内触发另一个事件 任务可以立即执行 也可以延迟应用场景场景一 订单

大家好,欢迎来到IT知识分享网。

五、文章定时发布—-延迟任务

黑马头条项目详解(三)

什么是延迟任务?

定时任务:有固定周期的,有明确的触发时间

延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟 

黑马头条项目详解(三)

应用场景

场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

技术选型

1、DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

黑马头条项目详解(三)

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

compareTo方法:用于排序,确定元素出队列的顺序。

2、RabbitMQ实现延迟任务

TTL:Time To Live (消息存活时间)

死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

黑马头条项目详解(三)

3、redis实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

黑马头条项目详解(三)

 5.1、redis实现延迟任务的思路

黑马头条项目详解(三)

问题思路

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

黑马头条项目详解(三)

3.在添加zset数据的时候,为什么需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

5.2、表结构

表结构

taskinfo 任务表

黑马头条项目详解(三)

taskinfo_logs 任务日志表 

黑马头条项目详解(三)

 5.3、延迟任务—添加任务

①创建task类,用于接收添加任务的参数

package com.heima.model.schedule.dtos; import lombok.Data; import java.io.Serializable; @Data public class Task implements Serializable { / * 任务id */ private Long taskId; / * 类型 */ private Integer taskType; / * 优先级 */ private Integer priority; / * 执行id */ private long executeTime; / * task参数 */ private byte[] parameters; }

②创建TaskService

package com.heima.schedule.service; import com.heima.model.schedule.dtos.Task; / * 对外访问接口 */ public interface TaskService { / * 添加任务 * @param task 任务对象 * @return 任务id */ public long addTask(Task task) ; }

实现类

package com.heima.schedule.service.impl; import com.alibaba.fastjson.JSON; import com.heima.common.constants.ScheduleConstants; import com.heima.common.redis.CacheService; import com.heima.model.schedule.dtos.Task; import com.heima.model.schedule.pojos.Taskinfo; import com.heima.model.schedule.pojos.TaskinfoLogs; import com.heima.schedule.mapper.TaskinfoLogsMapper; import com.heima.schedule.mapper.TaskinfoMapper; import com.heima.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Calendar; import java.util.Date; @Service @Transactional @Slf4j public class TaskServiceImpl implements TaskService { / * 添加延迟任务 * * @param task * @return */ @Override public long addTask(Task task) { //1.添加任务到数据库中 boolean success = addTaskToDb(task); if (success) { //2.添加任务到redis addTaskToCache(task); } return task.getTaskId(); } @Autowired private CacheService cacheService; / * 把任务添加到redis中 * * @param task */ private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); //获取5分钟之后的时间 毫秒值 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis(); //2.1 如果任务的执行时间小于等于当前时间,存入list if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中 cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } } @Autowired private TaskinfoMapper taskinfoMapper; @Autowired private TaskinfoLogsMapper taskinfoLogsMapper; / * 添加任务到数据库中 * * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //保存任务表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); //设置taskID task.setTaskId(taskinfo.getTaskId()); //保存任务日志数据 TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } }

ScheduleConstants常量类

package com.heima.common.constants; public class ScheduleConstants { //task状态 public static final int SCHEDULED=0; //初始化状态 public static final int EXECUTED=1; //已执行状态 public static final int CANCELLED=2; //已取消状态 public static String FUTURE="future_"; //未来数据key前缀 public static String TOPIC="topic_"; //当前数据key前缀 }

5.4、延迟任务—取消任务

黑马头条项目详解(三)

在TaskService中添加方法

/ * 取消任务 * @param taskId 任务id * @return 取消结果 */ public boolean cancelTask(long taskId); 

 实现

/ * 取消任务 * @param taskId * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //删除任务,更新日志 Task task = updateDb(taskId,ScheduleConstants.EXECUTED); //删除redis的数据 if(task != null){ removeTaskFromCache(task); flag = true; } return false; } / * 删除redis中的任务数据 * @param task */ private void removeTaskFromCache(Task task) { String key = task.getTaskType()+"_"+task.getPriority(); if(task.getExecuteTime()<=System.currentTimeMillis()){ cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task)); } } / * 删除任务,更新任务日志状态 * @param taskId * @param status * @return */ private Task updateDb(long taskId, int status) { Task task = null; try { //删除任务 taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid={}",taskId); } return task; }

5.5、延迟任务—消费任务

①在TaskService中添加方法

/ * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ public Task poll(int type,int priority);

实现

/ * 按照类型和优先级拉取任务 * @return */ @Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"_"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); //更新数据库信息 updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); } return task; }

5.6、延迟任务—未来数据定时刷新,zset同步至list

黑马头条项目详解(三)

 redis的key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

黑马头条项目详解(三)

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

黑马头条项目详解(三)

redis通道

普通redis客户端和服务器交互模式

黑马头条项目详解(三)Pipeline请求模型

黑马头条项目详解(三)

功能实现

 ①在TaskService中添加方法

@Scheduled(cron = "0 */1 * * * ?") public void refresh() { System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务"); // 获取所有未来数据集合的key值 Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_* for (String futureKey : futureKeys) { // future_250_250 String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; //获取该组key下当前需要消费的任务数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { //将这些任务数据添加到消费者队列中 cacheService.refreshWithPipeline(futureKey, topicKey, tasks); System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下"); } } }

②在引导类中添加开启任务调度注解:`@EnableScheduling`

5.7、分布式锁解决集群下的方法抢占执行

问题描述

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法 

黑马头条项目详解(三)

分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。  

黑马头条项目详解(三)

redis实现分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。 

黑马头条项目详解(三)

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
  • 客户端A执行代码完成,删除锁
  • 客户端B在等待一段时间后再去请求设置key的值,设置成功
  • 客户端B执行代码完成,删除锁

实现

在工具类CacheService中添加方法

/ * 加锁 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }

修改未来数据定时刷新的方法,如下:

/ * 未来数据定时刷新 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh(){ String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未来数据定时刷新---定时任务"); //获取所有未来数据的集合key Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //获取当前数据的key topic String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查询符合条件的数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步数据 if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的将"+futureKey+"刷新到了"+topicKey); } } } }

5.8、数据库任务定时同步到redis

黑马头条项目详解(三)

@Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("数据库数据同步到缓存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); //查看小于未来5分钟的所有任务 List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime())); if(allTasks != null && allTasks.size() > 0){ for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } } private void clearCache(){ // 删除缓存中未来数据集合和当前消费者队列的所有key Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_ Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_ cacheService.delete(futurekeys); cacheService.delete(topickeys); }

 5.9、延迟队列解决精准时间发布文章

①延迟队列服务提供对外接口 

提供远程的feign接口,在heima-leadnews-feign-api编写类如下: 

package com.heima.apis.schedule; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @FeignClient("leadnews-schedule") public interface IScheduleClient { / * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); / * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId); / * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }

 在heima-leadnews-schedule微服务下提供对应的实现

package com.heima.schedule.feign; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController public class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; / * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } / * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } / * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }

②发布文章集成添加延迟队列接口

在创建WmNewsTaskService

package com.heima.wemedia.service; import com.heima.model.wemedia.pojos.WmNews; public interface WmNewsTaskService { / * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ public void addNewsToTask(Integer id, Date publishTime); }

实现

package com.heima.wemedia.service.impl; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.enums.TaskTypeEnum; import com.heima.model.schedule.dtos.Task; import com.heima.model.wemedia.pojos.WmNews; import com.heima.utils.common.ProtostuffUtil; import com.heima.wemedia.service.WmNewsTaskService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService { @Autowired private IScheduleClient scheduleClient; / * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任务到延迟服务中----begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟服务中----end"); } }

枚举类

package com.heima.model.common.enums; import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; //对应具体业务 private final int priority; //业务不同级别 private final String desc; //描述信息 }

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

@Autowired private WmNewsTaskService wmNewsTaskService; / * 发布修改文章或保存为草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //0.条件判断 if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //1.保存或修改文章 WmNews wmNews = new WmNews(); //属性拷贝 属性名词和类型相同才能拷贝 BeanUtils.copyProperties(dto,wmNews); //封面图片 list---> string if(dto.getImages() != null && dto.getImages().size() > 0){ //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } //如果当前封面类型为自动 -1 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //2.判断是否为草稿 如果为草稿结束当前方法 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //3.不是草稿,保存文章内容图片与素材的关系 //获取到文章内容中的图片信息 List<String> materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId()); //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片 saveRelativeInfoForCover(dto,wmNews,materials); //审核文章 // wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }

③消费任务进行审核文章

WmNewsTaskService中添加方法

/ * 消费延迟队列数据 */ public void scanNewsByTask();

实现

@Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService; / * 消费延迟队列数据 */ @Scheduled(fixedRate = 1000) @Override @SneakyThrows public void scanNewsByTask() { log.info("文章审核---消费任务执行---begin---"); ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } log.info("文章审核---消费任务执行---end---"); }

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

六、自媒体文章上下架

需求

黑马头条项目详解(三)

思路

 黑马头条项目详解(三)

会产生耦合 

黑马头条项目详解(三)

使用MQ可以解耦 

6.1、消息中间件

黑马头条项目详解(三)

黑马头条项目详解(三)

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统  

黑马头条项目详解(三)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

6.2、kafka安装配置

 Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

 下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1

6.3、kafka入门

黑马头条项目详解(三)

生产者发送消息

黑马头条项目详解(三)

 生产者发送消息

package com.heima.kafka.sample; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; / * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); //封装发送的消息 / 第一个参数topic;第二个参数消息的key;第三个参数消息的value */ ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","","hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } }

消费者接收消息

黑马头条项目详解(三)

package com.heima.kafka.sample; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; / * 消费者 */ public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("itheima-topic")); //当前线程一直处于监听状态 while (true) { //4.获取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } }

 同一个消费者组下的消费者只有一个能消费到topic黑马头条项目详解(三)

黑马头条项目详解(三)

分区

黑马头条项目详解(三)

分区策略

黑马头条项目详解(三)

6.4、kafka高可用设计

 集群

黑马头条项目详解(三)

备份机制

黑马头条项目详解(三)

黑马头条项目详解(三)

ISR(in-sync replica)需要同步复制保存的follower

如果leader失效后,需要选出新的leader,选举的原则如下:

第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的

第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

极端情况,就是所有副本都失效了,这时有两种方案

第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定

第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

6.5、kafka生产者详解

发送类型

同步发送

使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get(); System.out.println(recordMetadata.offset());

 异步发送

调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//异步消息发送 producer.send(kvProducerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ System.out.println("记录异常信息到日志表中"); } System.out.println(recordMetadata.offset()); } });

参数详解

黑马头条项目详解(三)

黑马头条项目详解(三)

黑马头条项目详解(三) 6.6、kafka消费者详解

消费者组

黑马头条项目详解(三)

  • 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
  • 一个发布在Topic上消息被分发给此消费者组中的一个消费者
    • 所有的消费者都在一个组中,那么这就变成了queue模型
    • 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型

消息有序性

应用场景:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
  • 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

黑马头条项目详解(三)

topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。  

6.7、springboot集成kafka

①导入spring-kafka依赖信息

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> </dependencies>

 ②在resources下创建文件application.yml

server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

③消息生产者

package com.heima.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("/hello") public String hello(){ kafkaTemplate.send("itcast-topic","黑马程序员"); return "ok"; } }

④消息消费者

package com.heima.kafka.listener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "itcast-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(message); } } }

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
@GetMapping("/hello") public String hello(){ User user = new User(); user.setUsername("xiaowang"); user.setAge(18); kafkaTemplate.send("user-topic", JSON.toJSONString(user)); return "ok"; }
  • 接收消息  
package com.heima.kafka.listener; import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "user-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ User user = JSON.parseObject(message, User.class); System.out.println(user); } } }

6.8、自媒体文章上下架–自媒体端

流程图 

黑马头条项目详解(三)

接口定义

黑马头条项目详解(三)

 ①在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up") public ResponseResult downOrUp(@RequestBody WmNewsDto dto){ return wmNewsService.downOrUp(dto); }

在WmNewsDto中新增enable属性 ,完整的代码如下:

package com.heima.model.wemedia.dtos; import lombok.Data; import java.util.Date; import java.util.List; @Data public class WmNewsDto { private Integer id; / * 标题 */ private String title; / * 频道id */ private Integer channelId; / * 标签 */ private String labels; / * 发布时间 */ private Date publishTime; / * 文章内容 */ private String content; / * 文章封面类型 0 无图 1 单图 3 多图 -1 自动 */ private Short type; / * 提交时间 */ private Date submitedTime; / * 状态 提交为1 草稿为0 */ private Short status; / * 封面图片列表 多张图以逗号隔开 */ private List<String> images; / * 上下架 0 下架 1 上架 */ private Short enable; }

②在WmNewsService新增方法

/ * 文章的上下架 * @param dto * @return */ public ResponseResult downOrUp(WmNewsDto dto);

实现

/ * 文章的上下架 * @param dto * @return */ @Override public ResponseResult downOrUp(WmNewsDto dto) { //1.检查参数 if(dto.getId() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //2.查询文章 WmNews wmNews = getById(dto.getId()); if(wmNews == null){ return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在"); } //3.判断文章是否已发布 if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架"); } //4.修改文章enable if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){ update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()) .eq(WmNews::getId,wmNews.getId())); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }

 6.9、自媒体文章上下架–消息通知article端文章上下架

①在heima-leadnews-common模块下导入kafka依赖

<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>

②在自媒体端的nacos配置中心配置kafka的生产者

spring: kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

③在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置 if(wmNews.getArticleId() != null){ Map<String,Object> map = new HashMap<>(); map.put("articleId",wmNews.getArticleId()); map.put("enable",dto.getEnable()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); }

常量类

public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }

④在article端的nacos配置中心配置kafka的消费者

spring: kafka: bootstrap-servers: 192.168.200.130:9092 consumer: group-id: ${spring.application.name} key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

⑤在article端编写监听,接收数据

package com.heima.article.listener; import com.alibaba.fastjson.JSON; import com.heima.article.service.ApArticleConfigService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @Slf4j public class ArtilceIsDownListener { @Autowired private ApArticleConfigService apArticleConfigService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void onMessage(String message){ if(StringUtils.isNotBlank(message)){ Map map = JSON.parseObject(message, Map.class); apArticleConfigService.updateByMap(map); log.info("article端文章配置修改,articleId={}",map.get("articleId")); } } }

⑥修改ap_article_config表的数据

新建ApArticleConfigService

package com.heima.article.service; import com.baomidou.mybatisplus.extension.service.IService; import com.heima.model.article.pojos.ApArticleConfig; import java.util.Map; public interface ApArticleConfigService extends IService<ApArticleConfig> { / * 修改文章配置 * @param map */ public void updateByMap(Map map); }

实现类

package com.heima.article.service.impl; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.heima.article.mapper.ApArticleConfigMapper; import com.heima.article.service.ApArticleConfigService; import com.heima.model.article.pojos.ApArticleConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Map; @Service @Slf4j @Transactional public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService { / * 修改文章配置 * @param map */ @Override public void updateByMap(Map map) { //0 下架 1 上架 Object enable = map.get("enable"); boolean isDown = true; if(enable.equals(1)){ isDown = false; } //修改文章配置 update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown)); } }

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/109929.html

(0)
上一篇 2026-02-03 20:47
下一篇 2026-02-03 21:10

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信