package main
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/gin-gonic/gin"
"github.com/segmentio/kafka-go"
"net/http"
"time"
)
// kafka配置
var kafkaConfig = &sarama.Config{
Version: sarama.V2_1_0_0,
Net: sarama.NetConfig{MaxOpenRequests: 5},
Producer: sarama.ProducerConfig{
Retry: 5,
MaxRetries: 5,
RequiredAcks: sarama.WaitForAll,
Flush: sarama.FlushSync,
Timeout: time.Millisecond * 5000,
},
Consumer: sarama.ConsumerConfig{
MaxWaitTime: time.Millisecond * 100,
},
}
// 订单信息
type Order struct {
OrderId int64 //订单Id
UserId int64 //用户Id
GoodsId int64 //商品Id
GoodsNum int64 //商品数量
OrderTime time.Time //订单时间
}
// 初始化consumer
func InitConsumer(config *sarama.Config) sarama.Consumer {
consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, config)
if err != nil {
panic(err)
}
return consumer
}
// 消费订单信息
func ConsumeOrder(c *gin.Context) {
// 初始化consumer
consumer := InitConsumer(kafkaConfig)
defer consumer.Close()
// 从topic拉取数据
partitionConsumer, err := consumer.ConsumePartition("order_topic", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partitionConsumer.Close()
// 从消息中拉取数据
for {
select {
case msg := <-partitionConsumer.Messages():
// 获取订单信息并反序列化
order := Order{}
err := json.Unmarshal([]byte(msg.Value), &order)
if err != nil {
fmt.Println("反序列化失败")
continue
}
// 扣减商品库存
err = ReduceGoodsNum(order)
if err != nil {
fmt.Println("扣减库存失败")
continue
}
// 入库订单信息
err = InsertOrder(order)
if err != nil {
fmt.Println("订单入库失败")
continue
}
// 打印消息
fmt.Println("消费消息成功:", order)
case err := <-partitionConsumer.Errors():
fmt.Println("消费消息失败:", err)
c.JSON(http.StatusInternalServerError, gin.H{"message": "订单消息消费失败"})
return
}
}
c.JSON(http.StatusOK, gin.H{"message": "订单消息消费成功"})
}
// 扣减商品库存
func ReduceGoodsNum(order Order) error {
// 查询商品库存
// ...
// 扣减商品库存
// ...
return nil
}
// 入库订单信息
func InsertOrder(order Order) error {
// db := // 获取数据库
// 插入订单信息
// ...
return nil
}
// kafka发布消息
func PublishOrder(c *gin.Context) {
// 初始化生产者
producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, kafkaConfig)
if err != nil {
panic(err)
}
defer producer.Close()
// 序列化订单信息
order := Order{
OrderId: 1,
UserId: 1,
GoodsId: 1,
GoodsNum: 1,
OrderTime: time.Now(),
}
orderJson, err := json.Marshal(order)
if err != nil {
panic(err)
}
// 发布消息
message := &sarama.ProducerMessage{
Topic: "order_topic",
Partition: 0,
Value: sarama.ByteEncoder(orderJson),
}
_, _, err = producer.SendMessage(message)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"message": "消息发布失败"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "消息发布成功"})
}
func main() {
router := gin.Default()
router.POST("/publishOrder", PublishOrder)
router.POST("/consumeOrder", ConsumeOrder)
router.Run(":8080")
}