大家好,欢迎来到IT知识分享网。
五、文章定时发布—-延迟任务
什么是延迟任务?
定时任务:有固定周期的,有明确的触发时间
延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
应用场景
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
技术选型
1、DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
2、RabbitMQ实现延迟任务
TTL:Time To Live (消息存活时间)
死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
3、redis实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
5.1、redis实现延迟任务的思路
问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
3.在添加zset数据的时候,为什么需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
5.2、表结构
表结构
taskinfo 任务表
taskinfo_logs 任务日志表
5.3、延迟任务—添加任务
①创建task类,用于接收添加任务的参数
package com.heima.model.schedule.dtos; import lombok.Data; import java.io.Serializable; @Data public class Task implements Serializable { / * 任务id */ private Long taskId; / * 类型 */ private Integer taskType; / * 优先级 */ private Integer priority; / * 执行id */ private long executeTime; / * task参数 */ private byte[] parameters; }
②创建TaskService
package com.heima.schedule.service; import com.heima.model.schedule.dtos.Task; / * 对外访问接口 */ public interface TaskService { / * 添加任务 * @param task 任务对象 * @return 任务id */ public long addTask(Task task) ; }
实现类
package com.heima.schedule.service.impl; import com.alibaba.fastjson.JSON; import com.heima.common.constants.ScheduleConstants; import com.heima.common.redis.CacheService; import com.heima.model.schedule.dtos.Task; import com.heima.model.schedule.pojos.Taskinfo; import com.heima.model.schedule.pojos.TaskinfoLogs; import com.heima.schedule.mapper.TaskinfoLogsMapper; import com.heima.schedule.mapper.TaskinfoMapper; import com.heima.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Calendar; import java.util.Date; @Service @Transactional @Slf4j public class TaskServiceImpl implements TaskService { / * 添加延迟任务 * * @param task * @return */ @Override public long addTask(Task task) { //1.添加任务到数据库中 boolean success = addTaskToDb(task); if (success) { //2.添加任务到redis addTaskToCache(task); } return task.getTaskId(); } @Autowired private CacheService cacheService; / * 把任务添加到redis中 * * @param task */ private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); //获取5分钟之后的时间 毫秒值 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis(); //2.1 如果任务的执行时间小于等于当前时间,存入list if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中 cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } } @Autowired private TaskinfoMapper taskinfoMapper; @Autowired private TaskinfoLogsMapper taskinfoLogsMapper; / * 添加任务到数据库中 * * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //保存任务表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); //设置taskID task.setTaskId(taskinfo.getTaskId()); //保存任务日志数据 TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } }
ScheduleConstants常量类
package com.heima.common.constants; public class ScheduleConstants { //task状态 public static final int SCHEDULED=0; //初始化状态 public static final int EXECUTED=1; //已执行状态 public static final int CANCELLED=2; //已取消状态 public static String FUTURE="future_"; //未来数据key前缀 public static String TOPIC="topic_"; //当前数据key前缀 }
5.4、延迟任务—取消任务
在TaskService中添加方法
/ * 取消任务 * @param taskId 任务id * @return 取消结果 */ public boolean cancelTask(long taskId);
实现
/ * 取消任务 * @param taskId * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //删除任务,更新日志 Task task = updateDb(taskId,ScheduleConstants.EXECUTED); //删除redis的数据 if(task != null){ removeTaskFromCache(task); flag = true; } return false; } / * 删除redis中的任务数据 * @param task */ private void removeTaskFromCache(Task task) { String key = task.getTaskType()+"_"+task.getPriority(); if(task.getExecuteTime()<=System.currentTimeMillis()){ cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task)); } } / * 删除任务,更新任务日志状态 * @param taskId * @param status * @return */ private Task updateDb(long taskId, int status) { Task task = null; try { //删除任务 taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid={}",taskId); } return task; }
5.5、延迟任务—消费任务
①在TaskService中添加方法
/ * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ public Task poll(int type,int priority);
实现
/ * 按照类型和优先级拉取任务 * @return */ @Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"_"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); //更新数据库信息 updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); } return task; }
5.6、延迟任务—未来数据定时刷新,zset同步至list
redis的key值匹配
方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞
方案2:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
redis通道
普通redis客户端和服务器交互模式

功能实现
①在TaskService中添加方法
@Scheduled(cron = "0 */1 * * * ?") public void refresh() { System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务"); // 获取所有未来数据集合的key值 Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_* for (String futureKey : futureKeys) { // future_250_250 String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; //获取该组key下当前需要消费的任务数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { //将这些任务数据添加到消费者队列中 cacheService.refreshWithPipeline(futureKey, topicKey, tasks); System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下"); } } }
②在引导类中添加开启任务调度注解:`@EnableScheduling`
5.7、分布式锁解决集群下的方法抢占执行
问题描述
启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
分布式锁
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
redis实现分布式锁
sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
- 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
- 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
- 客户端A执行代码完成,删除锁
- 客户端B在等待一段时间后再去请求设置key的值,设置成功
- 客户端B执行代码完成,删除锁
实现
在工具类CacheService中添加方法
/ * 加锁 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }
修改未来数据定时刷新的方法,如下:
/ * 未来数据定时刷新 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh(){ String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未来数据定时刷新---定时任务"); //获取所有未来数据的集合key Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //获取当前数据的key topic String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查询符合条件的数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步数据 if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的将"+futureKey+"刷新到了"+topicKey); } } } }
5.8、数据库任务定时同步到redis
@Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("数据库数据同步到缓存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); //查看小于未来5分钟的所有任务 List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime())); if(allTasks != null && allTasks.size() > 0){ for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } } private void clearCache(){ // 删除缓存中未来数据集合和当前消费者队列的所有key Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_ Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_ cacheService.delete(futurekeys); cacheService.delete(topickeys); }
5.9、延迟队列解决精准时间发布文章
①延迟队列服务提供对外接口
提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
package com.heima.apis.schedule; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @FeignClient("leadnews-schedule") public interface IScheduleClient { / * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); / * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId); / * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }
在heima-leadnews-schedule微服务下提供对应的实现
package com.heima.schedule.feign; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController public class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; / * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } / * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } / * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }
②发布文章集成添加延迟队列接口
在创建WmNewsTaskService
package com.heima.wemedia.service; import com.heima.model.wemedia.pojos.WmNews; public interface WmNewsTaskService { / * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ public void addNewsToTask(Integer id, Date publishTime); }
实现
package com.heima.wemedia.service.impl; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.enums.TaskTypeEnum; import com.heima.model.schedule.dtos.Task; import com.heima.model.wemedia.pojos.WmNews; import com.heima.utils.common.ProtostuffUtil; import com.heima.wemedia.service.WmNewsTaskService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService { @Autowired private IScheduleClient scheduleClient; / * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任务到延迟服务中----begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟服务中----end"); } }
枚举类
package com.heima.model.common.enums; import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; //对应具体业务 private final int priority; //业务不同级别 private final String desc; //描述信息 }
修改发布文章代码:
把之前的异步调用修改为调用延迟任务
@Autowired private WmNewsTaskService wmNewsTaskService; / * 发布修改文章或保存为草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //0.条件判断 if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //1.保存或修改文章 WmNews wmNews = new WmNews(); //属性拷贝 属性名词和类型相同才能拷贝 BeanUtils.copyProperties(dto,wmNews); //封面图片 list---> string if(dto.getImages() != null && dto.getImages().size() > 0){ //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } //如果当前封面类型为自动 -1 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //2.判断是否为草稿 如果为草稿结束当前方法 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //3.不是草稿,保存文章内容图片与素材的关系 //获取到文章内容中的图片信息 List<String> materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId()); //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片 saveRelativeInfoForCover(dto,wmNews,materials); //审核文章 // wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
③消费任务进行审核文章
WmNewsTaskService中添加方法
/ * 消费延迟队列数据 */ public void scanNewsByTask();
实现
@Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService; / * 消费延迟队列数据 */ @Scheduled(fixedRate = 1000) @Override @SneakyThrows public void scanNewsByTask() { log.info("文章审核---消费任务执行---begin---"); ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } log.info("文章审核---消费任务执行---end---"); }
在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
六、自媒体文章上下架
需求
思路
会产生耦合
使用MQ可以解耦
6.1、消息中间件
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
6.2、kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
- Docker安装zookeeper
下载镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- Docker安装kafka
下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1
6.3、kafka入门
生产者发送消息
生产者发送消息
package com.heima.kafka.sample; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; / * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); //封装发送的消息 / 第一个参数topic;第二个参数消息的key;第三个参数消息的value */ ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","","hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } }
消费者接收消息
package com.heima.kafka.sample; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; / * 消费者 */ public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("itheima-topic")); //当前线程一直处于监听状态 while (true) { //4.获取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } }
同一个消费者组下的消费者只有一个能消费到topic
分区
分区策略
6.4、kafka高可用设计
集群
备份机制
ISR(in-sync replica)需要同步复制保存的follower
如果leader失效后,需要选出新的leader,选举的原则如下:
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取
极端情况,就是所有副本都失效了,这时有两种方案
第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整
6.5、kafka生产者详解
发送类型
同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get(); System.out.println(recordMetadata.offset());
异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
//异步消息发送 producer.send(kvProducerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ System.out.println("记录异常信息到日志表中"); } System.out.println(recordMetadata.offset()); } });
参数详解
6.6、kafka消费者详解
消费者组
- 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
- 一个发布在Topic上消息被分发给此消费者组中的一个消费者
- 所有的消费者都在一个组中,那么这就变成了queue模型
- 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型
消息有序性
应用场景:
- 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
- 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序
topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
6.7、springboot集成kafka
①导入spring-kafka依赖信息
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> </dependencies>
②在resources下创建文件application.yml
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
③消息生产者
package com.heima.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("/hello") public String hello(){ kafkaTemplate.send("itcast-topic","黑马程序员"); return "ok"; } }
④消息消费者
package com.heima.kafka.listener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "itcast-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(message); } } }
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
- 发送消息
@GetMapping("/hello") public String hello(){ User user = new User(); user.setUsername("xiaowang"); user.setAge(18); kafkaTemplate.send("user-topic", JSON.toJSONString(user)); return "ok"; }
- 接收消息
package com.heima.kafka.listener; import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "user-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ User user = JSON.parseObject(message, User.class); System.out.println(user); } } }
6.8、自媒体文章上下架–自媒体端
流程图
接口定义
①在heima-leadnews-wemedia工程下的WmNewsController新增方法
@PostMapping("/down_or_up") public ResponseResult downOrUp(@RequestBody WmNewsDto dto){ return wmNewsService.downOrUp(dto); }
在WmNewsDto中新增enable属性 ,完整的代码如下:
package com.heima.model.wemedia.dtos; import lombok.Data; import java.util.Date; import java.util.List; @Data public class WmNewsDto { private Integer id; / * 标题 */ private String title; / * 频道id */ private Integer channelId; / * 标签 */ private String labels; / * 发布时间 */ private Date publishTime; / * 文章内容 */ private String content; / * 文章封面类型 0 无图 1 单图 3 多图 -1 自动 */ private Short type; / * 提交时间 */ private Date submitedTime; / * 状态 提交为1 草稿为0 */ private Short status; / * 封面图片列表 多张图以逗号隔开 */ private List<String> images; / * 上下架 0 下架 1 上架 */ private Short enable; }
②在WmNewsService新增方法
/ * 文章的上下架 * @param dto * @return */ public ResponseResult downOrUp(WmNewsDto dto);
实现
/ * 文章的上下架 * @param dto * @return */ @Override public ResponseResult downOrUp(WmNewsDto dto) { //1.检查参数 if(dto.getId() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //2.查询文章 WmNews wmNews = getById(dto.getId()); if(wmNews == null){ return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在"); } //3.判断文章是否已发布 if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架"); } //4.修改文章enable if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){ update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()) .eq(WmNews::getId,wmNews.getId())); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
6.9、自媒体文章上下架–消息通知article端文章上下架
①在heima-leadnews-common模块下导入kafka依赖
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
②在自媒体端的nacos配置中心配置kafka的生产者
spring: kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
③在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置 if(wmNews.getArticleId() != null){ Map<String,Object> map = new HashMap<>(); map.put("articleId",wmNews.getArticleId()); map.put("enable",dto.getEnable()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); }
常量类
public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
④在article端的nacos配置中心配置kafka的消费者
spring: kafka: bootstrap-servers: 192.168.200.130:9092 consumer: group-id: ${spring.application.name} key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
⑤在article端编写监听,接收数据
package com.heima.article.listener; import com.alibaba.fastjson.JSON; import com.heima.article.service.ApArticleConfigService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @Slf4j public class ArtilceIsDownListener { @Autowired private ApArticleConfigService apArticleConfigService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void onMessage(String message){ if(StringUtils.isNotBlank(message)){ Map map = JSON.parseObject(message, Map.class); apArticleConfigService.updateByMap(map); log.info("article端文章配置修改,articleId={}",map.get("articleId")); } } }
⑥修改ap_article_config表的数据
新建ApArticleConfigService
package com.heima.article.service; import com.baomidou.mybatisplus.extension.service.IService; import com.heima.model.article.pojos.ApArticleConfig; import java.util.Map; public interface ApArticleConfigService extends IService<ApArticleConfig> { / * 修改文章配置 * @param map */ public void updateByMap(Map map); }
实现类
package com.heima.article.service.impl; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.heima.article.mapper.ApArticleConfigMapper; import com.heima.article.service.ApArticleConfigService; import com.heima.model.article.pojos.ApArticleConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Map; @Service @Slf4j @Transactional public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService { / * 修改文章配置 * @param map */ @Override public void updateByMap(Map map) { //0 下架 1 上架 Object enable = map.get("enable"); boolean isDown = true; if(enable.equals(1)){ isDown = false; } //修改文章配置 update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown)); } }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/109929.html




































6.6、kafka消费者详解


