写一段gin使用kafka写订单下单10分钟后批量催大量用户付款的任务
package main
import (
"context"
"time"
"github.com/Shopify/sarama"
"github.com/gin-gonic/gin"
)
type Order struct {
Id int64
Name string
Time int64
}
func main() {
// 初始化gin
r := gin.Default()
// 订单下单处理
r.POST("/order", func(c *gin.Context) {
// 解析订单
var order Order
if err := c.ShouldBind(&order); err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
// 将订单发送到kafka
kafkaProducer(order)
// 订单成功响应
c.JSON(200, gin.H{"status": "ok"})
})
// 开启定时任务,定时查询kafka中10分钟前下单的订单,催大量用户付款
go kafkaConsumer()
r.Run()
}
func kafkaProducer(order Order) {
// 初始化kafka生产者
producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.Close()
// 构造消息
msg := &sarama.ProducerMessage{
Topic: "order",
Value: sarama.ByteEncoder(order),
}
// 发送消息
if _, _, err := producer.SendMessage(msg); err != nil {
panic(err)
}
}
func kafkaConsumer() {
// 初始化kafka消费者
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
// 订阅topic
partitions, err := consumer.Partitions("order")
if err != nil {
panic(err)
}
for _, partition := range partitions {
// 创建分区消费者
pc, err := consumer.ConsumePartition("order", partition, sarama.OffsetNewest)
if err != nil {
panic(err)
}
// 异步从该分区消费消息
go func(pc sarama.PartitionConsumer) {
for msg := range pc.Messages() {
// 解析消息
var order Order
if err := json.Unmarshal(msg.Value, &order); err != nil {
continue
}
// 下单10分钟后,催大量用户付款
if time.Now().Unix()-order.Time > 10*60 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second