首页 >> 知识 >> docker安装kafka,并集成springboot进行测试

docker安装kafka,并集成springboot进行测试

大家好,今天草莓视频在线观看APP开始学习kafka中间件,今天草莓视频在线观看APP改变一下策略,不刷视频学习,改为实践学习,在网上找一些草莓视频污版免费功能去做,来达到学习实践的目的。

首先,是安装相关组件。

1. docker安装安装

1.1 yum-utils软件包

yum install -y yum-utils

1.2 设置阿里云镜像

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

1.3 安装docker

yum install docker-ce docker-ce-cli containerd.io

1.4 启动docker

systemctl start docker

1.5 测试

docker versiondocker run hello-worlddocker images

至此,docker就安装完毕了。接下来就是安装zookeeper和kafka了,我这里用的是kafka2.x的版本,因此需要结合zookeeper去是使用。现在最新的kafka3.x已经可以抛弃zookeeper去单独使用了,小伙伴们有兴趣的话可以自己去动手安装实践下。

2. 安装zookeeper和kafka

2.1 docker安装zookeeper

docker pull wurstmeister/zookeeper

2.2 启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper

2.3 docker查看zookeeper容器是否启动

docker ps

 出现以上信息,就代表zookeeper已经安装并启动成功。

2.4 安装kafka

docker pull wurstmeister/kafka

2.5 启动kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka

2.6 用docker ps查看kafka是否启动

出现以上信息,就代表kafka启动成功了。

下来就测试一下

3. 发送消息和消费消息

3.1 进入kafka容器

docker exec -it 容器id /bin/bashcd /opt/kafka_2.13-2.8.1/bin/

 3.2 连接生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping

接下来就可以发送消息了。

 3.3 另起一个窗口,重复3.1的动作进入kafka容器,然后连接消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning

这是就能就收消息了。

 到达这里,草莓视频在线观看APP的kafka就安装并测试成功了。

4. 接下来草莓视频在线观看APP就创建Springboot工程来连接kafka进行消息的生产和消费

4.1 pom.xml

4.0.0 org.springframework.boot spring-boot-starter-parent 2.7.4 com.volga kafka 0.0.1-SNAPSHOT Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka org.springframework.kafka spring-kafka-test test com.alibaba fastjson 1.2.58 org.springframework.boot spring-boot-maven-plugin

4.2 草莓视频在线观看APP创建一个订单的实体类

@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class Order { /** * 订单id */ private long orderId; /** * 订单号 */ private String orderNum; /** * 订单创建时间 */ private LocalDateTime createTime;}

4.3 创建生产者

@Component@Slf4jpublic class KafkaProvider { /** * 消息 TOPIC */ private static final String TOPIC = "shopping"; @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) { // 构建一个订单类 Order order = Order.builder() .orderId(orderId) .orderNum(orderNum) .createTime(createTime) .build(); // 发送消息,订单类的 json 作为消息体 ListenableFuture future = kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order)); // 监听回调 future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable throwable) { log.info("生产者产生消息 失败 ## Send message fail ..."); } @Override public void onSuccess(SendResult result) { log.info("生产者产生消息 成功 ## Send message success ..."); } }); }}

4.4 创建消费者

@Component@Slf4jpublic class KafkaConsumer { @KafkaListener(topics = "shopping", groupId = "group_id") //这个groupId是在yml中配置的 public void consumer(String message) { log.info("消费者消费信息 ## consumer message: {}", message); }}

4.5 创建测试类

@SpringBootTestpublic class SpringBootKafakaApplicationTests { @Autowired private KafkaProvider kafkaProvider; @Test public void sendMessage() throws InterruptedException { System.out.println("是否为空??+"+kafkaProvider); // 发送 10 个消息 for (int i = 0; i < 10; i++) { long orderId = i+1; String orderNum = UUID.randomUUID().toString(); kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now()); } TimeUnit.MINUTES.sleep(1); }}

4.6 要创建一个Application方法,不然项目会启动报错

@SpringBootApplicationpublic class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class,args); }}

4.7 配置application.yml

spring: kafka: # 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔) bootstrap-servers: 服务器ip:9092 consumer: # 指定 group_id group-id: group_id auto-offset-reset: earliest # 指定消息key和消息体的序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # 发生错误后,消息重发的次数。 retries: 0 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 指定消息key和消息体的序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringSerializer value-deserializer: org.apache.kafka.common.serialization.StringSerializer listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false

以上就创建项目成功了,草莓视频在线观看APP运行测试方法,就能获取kafka中的消息了。

 生产消息

  消费消息

这里就是简单实现了kafka的消息生产和消费,后续的kafka复杂场景的实现会持续更新。

我是空谷有来人,谢谢支持。 

网站地图