腾讯mini项目-【指标监控服务重构】2023-08-22

2023-09-16 01:30:19

今日已办

50字项目价值和重难点

项目价值

通过将指标监控组件接入项目,对比包括其配套工具在功能、性能上的差异、优劣,给出监控服务瘦身的建议

top3难点

  1. 减少监控服务资源成本,考虑性能优化
  2. 如何证明我们在监控服务差异、优劣方面的断言
  3. 监控服务无感化,支持代码可扩展

总监回复

小而美的监控服务

怎么为之小? 怎么为之美? 要小要美的关键点是什么?是你们对于这个服务的核心的把握,最内核的把握,是如何做好减法。多看点做减法的文章,难的地方是,如何先想厚再做薄。

Watermill

Replace sarama.SyncProducer -> kafka-client.Producer

  • 测试发现 WriteKafka 的 Span 耗时较久,Watermill-kafka 的 Publisher 的 Producer 是 sarama.SyncProducer,同步写入耗时较久,而 sarama.ASyncProducer的 API 不够直观,需求也需要我们更改底层库为 kafka-go

  • 对比原先写入kafka,发现 kafka-client(依赖 kafka-go) 的 Producer 可以支持同步or异步消息写入,故修改 Publisher 的实现

  • profile/internal/watermill/watermillkafka/marshaler.go

    package watermillkafka
    
    import (
    	"github.com/Shopify/sarama"
    	"github.com/ThreeDotsLabs/watermill/message"
    	"github.com/pkg/errors"
    	"github.com/segmentio/kafka-go"
    )
    
    const UUIDHeaderKey = "_watermill_message_uuid"
    const HeaderKey = "_key"
    
    // Marshaler marshals Watermill's message to Kafka message.
    type Marshaler interface {
    	Marshal(topic string, msg *message.Message) (*kafka.Message, error)
    }
    
    // Unmarshaler unmarshals Kafka's message to Watermill's message.
    type Unmarshaler interface {
    	Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
    }
    
    type MarshalerUnmarshaler interface {
    	Marshaler
    	Unmarshaler
    }
    
    type DefaultMarshaler struct{}
    
    func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*kafka.Message, error) {
    	if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
    		return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
    	}
    
    	headers := []kafka.Header{{
    		Key:   UUIDHeaderKey,
    		Value: []byte(msg.UUID),
    	}}
    	var msgKey string
    	for key, value := range msg.Metadata {
    		if key == HeaderKey {
    			msgKey = value
    		} else {
    			headers = append(headers, kafka.Header{
    				Key:   key,
    				Value: []byte(value),
    			})
    		}
    	}
    
    	return &kafka.Message{
    		Topic:   topic,
    		Key:     []byte(msgKey),
    		Value:   msg.Payload,
    		Headers: headers,
    	}, nil
    }
    
    func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) {
    	var messageID string
    	metadata := make(message.Metadata, len(kafkaMsg.Headers))
    
    	for _, header := range kafkaMsg.Headers {
    		if string(header.Key) == UUIDHeaderKey {
    			messageID = string(header.Value)
    		} else {
    			metadata.Set(string(header.Key), string(header.Value))
    		}
    	}
    
    	msg := message.NewMessage(messageID, kafkaMsg.Value)
    	msg.Metadata = metadata
    
    	return msg, nil
    }
    
  • profile/internal/watermill/watermillkafka/publisher.go

package watermillkafka

import (
	"context"
	kc "github.com/Kevinello/kafka-client"
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/pkg/errors"
	"go.uber.org/zap"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/watermill/model"
)

type Publisher struct {
	config   PublisherConfig
	producer *kc.Producer
	logger   watermill.LoggerAdapter

	closed bool
}

// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
	config PublisherConfig,
	logger watermill.LoggerAdapter,
) (*Publisher, error) {
	if err := config.Validate(); err != nil {
		return nil, err
	}

	if logger == nil {
		logger = watermill.NopLogger{}
	}

	producer, err := kc.NewProducer(context.Background(), config.KcProducerConfig)
	if err != nil {
		return nil, errors.Wrap(err, "cannot create Kafka producer")
	}

	return &Publisher{
		config:   config,
		producer: producer,
		logger:   logger,
	}, nil
}

type PublisherConfig struct {
	// Kafka brokers list.
	Brokers []string

	// Marshaler is used to marshal messages from Watermill format into Kafka format.
	Marshaler Marshaler

	// KcProducerConfig configuration object used to create new instances of Producer
	KcProducerConfig kc.ProducerConfig
}

func (c PublisherConfig) Validate() error {
	if len(c.Brokers) == 0 {
		return errors.New("missing brokers")
	}
	if c.Marshaler == nil {
		return errors.New("missing marshaler")
	}
	return c.KcProducerConfig.Validate()
}

// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
	if p.closed {
		return errors.New("publisher closed")
	}

	logFields := make(watermill.LogFields, 2)
	logFields["topic"] = topic

	for _, msg := range msgs {
		logFields["message_uuid"] = msg.UUID
		p.logger.Trace("Sending message to Kafka", logFields)

		kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
		if err != nil {
			return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
		}

		data := msg.Context().Value("data").(*model.ConsumeCtxData)
		err = p.producer.WriteMessages(msg.Context(), *kafkaMsg)
		if err != nil {
			log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))
			data.WriteKafkaSpan.End()
			data.RootSpan.End()
			return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
		}
		data.WriteKafkaSpan.End()
		log.Logger.Info("[WriteKafka] write kafka success",
			zap.String("topic", connector.GetTopic(data.Event.Category)),
			zap.String("id", data.Event.ID), zap.Any("msg", data.Event),
			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))

		p.logger.Trace("Message sent to Kafka", logFields)
		data.RootSpan.End()
	}

	return nil
}

func (p *Publisher) Close() error {
	if p.closed {
		return nil
	}
	p.closed = true

	if err := p.producer.Close(); err != nil {
		return errors.Wrap(err, "cannot close Kafka producer")
	}

	return nil
}
  • 测试发现性能对比先前修改有了大量提升,(还测试了不同环境-docker/本地,不同配置-同步/异步的区别),docker环境,开启异步是效率最高的

image-20230822164942693

image-20230822165245908

image-20230822165306809

明日待办

  1. benchmark:watermill 和 baserunner
更多推荐

D. Boris and His Amazing Haircut

Problem-D-Codeforces问题描述:剪发,将数组a减为数组b,有m个剪刀,每个剪刀只可以用一次且可以在任意区间内剪发,将长度大于mi的减为mi。现在有m数组,数组元素是第i个剪刀可以剪到mi,问能否将a减为b。洛谷翻译:思路:一定是先减最长的,再减短的。在减的时候会将这个a数组渐渐减成多个数组,再对这些数

权限控制、Spring Security入门

权限控制认证和授权概念问题1:在生产环境下如果不登录后台系统就可以完成这些某些功能操作吗?答案显然是否定的,要操作这些功能必须首先登录到系统才可以。问题2:是不是所有用户,只要登录成功就都可以操作所有功能呢?答案是否定的,并不是所有的用户都可以操作这些功能。不同的用户可能拥有不同的权限,这就需要进行授权了。认证:系统提

【已解决】ModuleNotFoundError: No module named ‘torchnet‘

问题描述今天在复现Chinese-Chatbot-PyTorch-Implementation的时候出现了一些问题:包括且不限于ModuleNotFoundError:Nomodulenamed'torchnet',ModuleNotFoundError:Nomodulenamed'fire',ModuleNotFou

重磅!OpenAI将发布DALL·E 3,多模态ChatGPT来了!

9月21日凌晨,OpenAI在官网宣布,在今年10月份将通过API向ChatGPTPlus和企业版用户提供全新文本生成图片产品——DALL·E3。这意味着,ChatGPT在DALL·E3加持下将开启久违的多模态输出模式,用户通过文本就能直接在ChatGPT中生成各种类型图片。例如,在ChatGPT中输入,生成一只可爱的

UE5学习笔记(1)——从源码开始编译安装UE5

目录0.前期准备1.Gitbashhere2.克隆官方源码。3.选择安装分支4.运行Setup.bat,下载依赖文件5.运行GenerateProjectFiles.bat生成工程文件6.生成完成,找到UE5.sln/UE4.sln7.大功告成0.前期准备0.1在windows的话,建议装一个Gitbash,同时还要有

多线程并发或线程安全问题如何解决

1、通过volatile关键字修饰变量,可以实现线程之间的可见性,避免变量脏读的出现,底层是通过限制jvm指令的重新排序实现的,适用于一个线程修改,多个线程读的场景。2、通过synchronized锁(任意对象)来实现线程同步,自动锁的思想,底层实现原理:当又线程进入同步代码快之后,利用jvm的计数器将锁的标记位置为1

开学季ipad电容笔哪款好?便宜的电容笔推荐

随着数码产品不断地更新和添加新的特性功能,iPad的平板已经可以和笔记本电脑相媲美了。而时至今日,随着技术的进步,ipad已经不再是一款单纯的娱乐设备,而是一款集学习、绘画、办公于一体的功能。为提高生产力,搭配上一款好用的电容笔是很有必要的。随着苹果Pencil的普及,国产平板电脑也在迅速发展,下面我就为大家介绍几款高

抖音矩阵系统-60+自媒体平台一键发布-短视频获客系统

当老板做企业的,想在抖音上做生意的,一定一定要开通蓝V企业号线索版来做矩阵。看看我每天的线索都是999+,客服都接待不过来,现在还有300个客资未分配,抖音都在推了企业员工号的玩法,你还在质疑什么?在抖音上做矩阵,真的能将你的生意放大100倍!你只有一个账号,每天就发一条视频,而我们的客户有几百个账号,每天发上千条视频

软件需求文档、设计文档、开发文档、运维文档大全

在软件开发过程中,文档扮演着至关重要的角色。它不仅记录了项目的需求、设计和开发过程,还为项目的维护和管理提供了便利。本文将详细介绍软件开发文档的重要性和作用,以及需求分析、软件设计、开发过程、运维管理和项目管理等方面的文档要求。引言软件开发文档是一种用于记录、沟通和理解软件开发过程的工具。它不仅提供了软件的详细描述,而

从 AI 代码生成模型到 AI 编程助手应用实战

▼最近直播超级多,预约保你有收获近期直播:《从AI编程助手到AIAgent应用实战》随着科技的发展,软件设计方式也在不断地演进,从最初的面向机器,到面向过程,再到面向对象,面向领域,最后到现在快要成为可能的面向自然语言。在软件设计开发领域,我们一直在探索更高效的开发方式—1—AI编程的发展史AI编程的前身,自动代码生成

OJ练习第178题——收集树中金币

收集树中金币力扣链接:2603.收集树中金币题目描述给你一个n个节点的无向无根树,节点编号从0到n-1。给你整数n和一个长度为n-1的二维整数数组edges,其中edges[i]=[ai,bi]表示树中节点ai和bi之间有一条边。再给你一个长度为n的数组coins,其中coins[i]可能为0也可能为1,1表示节点i处

热文推荐