一键搞定集群更新与测试 Pulsar更新智能化
背景
由于我在公司外部担任保养Pulsar,须要时不时的更新Pulsar版本从而和社区坚持分歧。
而每次更新环节都须要做相反的步骤:
命令行工具
以上的流程步骤最好是所有一键成功,咱们只须要人工检测下监控能否反常即可。
于是我便写了一个命令行工具,口头流程如下:
pulsar-upgrade-cli -hok | at 10:33:18A cli app for upgrading PulsarUsage:pulsar-upgrade-cli [command]Available Commands:completionGenerate the autocompletion script for the specified shellhelpHelp about any commandinstallinstall a target versionscalescale statefulSet of the clusterFlags:--burst-limit intclient-side default throttling limit (default 100)--debugenable verbose output-h, --helphelp for pulsar-upgrade-cli--kube-apiserver stringthe address and the port for the Kubernetes API server--kube-as-group stringArraygroup to impersonate for the operation, this flag can be repeated to specify multiple groups.--kube-as-user stringusername to impersonate for the operation
实在经常使用的example如下:
pulsar-upgrade-cli install \--values ./charts/pulsar/values.yaml \--set namespace=pulsar-test \--set initialize=true \--debug \--test-case-schema=http \--test-case-host=127.0.0.1 \--test-case-port=9999 \pulsar-test ./charts/pulsar -n pulsar-test
它的装置命令十分相似于helm,也是间接经常使用 helm 的value.yaml启动装置;只是在装置成功后(期待一切的 Pod 都处于 Running 形态)会再触发 test-case 测试,也就是恳求一个 endpoint。
同时还提供了一个 scale(扩、缩容) 命令,可以用修正集群规模:
# 缩容集群规模为0./pulsar-upgrade-cli scale --replicase 0 -n pulsar-test# 缩容为最小集群./pulsar-upgrade-cli scale --replicase 1 -n pulsar-test# 复原为最满集群./pulsar-upgrade-cli scale --replicase 2 -n pulsar-test
这个需求是由于咱们的Pulsar测试集群部署在了一个servless的kubernetes集群里,它是依照经常使用量不要钱的,所以在我不须要的经常使用的时刻可以经过这个命令将一切的正本数量修正为 0,从而缩小经常使用老本。
当只须要做便捷的性能测试时便回将集群修正为最小集群,将正本数修正为只可以提供服务即可。
而当须要做性能测试时就须要将集群修正为最高性能。
这样可以防止每次都装置新集群,同时也可以有效的缩小测试老本。
成功原理
require (github.com/spf13/cobra v1.6.1github.com/spf13/pflag v1.0.5helm.sh/helm/v3 v3.10.2)
这个命令行工具实质上是参考了 helm 的命令行成功的,一切关键也是依赖了helm和cobra。
上方以最关键的装置命令为例,外围的是以下的步骤:
func (e *installEvent) FinishInstall(cfg *action.Configuration, name string) error {bar.Increment()bar.Finish()clientSet, err := cfg.KubernetesClientSet()if err != nil {return err}ctx := context.Background()ip, err := GetServiceExternalIp(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-proxy", name))if err != nil {return err}token, err := GetPulsarProxyToken(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-token-proxy-admin", name))if err != nil {return err}// trigger testcaseerr = e.client.Trigger(context.Background(), ip, token)return err}
这里的FinishInstall须要失掉到新装置的 Pulsar 集群的 proxy IP 地址和鉴权所经常使用的token(GetServiceExternalIp()/GetPulsarProxyToken())。
将这两个参数传递给test-case才可以构建出pulsar-client.
这个命令的外围性能就是装置集群和触发测试,以及一些集群的基本运维才干。
测试框架
而关于这里的测试用例也有一些小同伴咨询过,如何对 Pulsar 启动性能测试。
其实 Pulsar 源码中曾经蕴含了简直一切咱们会经常使用到的测试代码,通常上只需新版本的官网镜像曾经推送了那就是跑了一切的单测,品质是可以保障的。
那为什么还须要做性能测试呢?
其实很很便捷,Pulsar这类基础组件官网都有提供基准测试,但咱们想要用于消费环境依然须要自己做压测得出一份属于自己环境下的性能测试报告。
基本目的是要看在自己的业务场景下能否可以满足(包括公司的软配件,不同的业务代码)。
所以这里的性能测试代码有一个很关键的前提就是:须要经常使用实在的业务代码启动测试。
也就是业务在线上经常使用与 Pulsar 关系的代码须要参考性能测试里的代码成功,不然有些疑问就无法在测试环节笼罩到。
成功原理
以上是一个集群的性能测试报告,这里我只要 8 个测试场景(联合实践业务经常使用),思索到未来或许会有新的测试用例,所以在设计这个测试框架时就得思索到裁减性。
AbstractJobDefine job5 =new FailoverConsumerTest(event, "缺点转移消费测试", pulsarClient, 20, admin);CompletableFuture<Void> c5 = CompletableFuture.runAsync(job5::start, EXECUTOR);AbstractJobDefine job6 = new SchemaTest(event,"schema测试",pulsarClient,20,prestoService);CompletableFuture<Void> c6 = CompletableFuture.runAsync(job6::start, EXECUTOR);AbstractJobDefine job7 = new VlogsTest(event,"vlogs test",pulsarClient,20, vlogsUrl);CompletableFuture<Void> c7 = CompletableFuture.runAsync(job7::start, EXECUTOR);CompletableFuture<Void> all = CompletableFuture.allOf(c1, c2, c3, c4, c5, c6, c7);all.whenComplete((___, __) -> {event.finishAll();pulsarClient.closeAsync();admin.close();}).get();
对外提供的 trigger 接口就不贴代码了,重点就是在这里构建测试义务,而后期待他们所有口头终了。
@Datapublic abstract class AbstractJobDefine {private Event event;private String jobName;private PulsarClient pulsarClient;private int timeout;private PulsarAdmin admin;public AbstractJobDefine(Event event, String jobName, PulsarClient pulsarClient, int timeout, PulsarAdmin admin) {this.event = event;this.jobName = jobName;this.pulsarClient = pulsarClient;this.timeout = timeout;this.admin = admin;}public void start() {event.addJob();try {CompletableFuture.runAsync(() -> {StopWatch watch = new StopWatch();try {watch.start(jobName);run(pulsarClient, admin);} catch (Exception e) {event.oneException(this, e);} finally {watch.stop();event.finishOne(jobName, StrUtil.format("cost: {}s", watch.getTotalTimeSeconds()));}}, TestCase.EXECUTOR).get(timeout, TimeUnit.SECONDS);} catch (Exception e) {event.oneException(this, e);}}/** run busy code* @param pulsarClient pulsar client* @param admin pulsar admin client* @throws Exception e*/public abstract void run(PulsarClient pulsarClient, PulsarAdmin admin) throws Exception;}
外围代码就是这个形象的义务定义类,其中的 start 函数用于定义义务口头的模版:
上方来看一个普通用例的成功状况:
就是重写了run()函数,而后在其中成功详细的测试用例,断言测试结果。
这样当咱们须要再参与用例的时刻只须要再新增一个子类成功即可。
同时还须要定义一个事情接口,用于处置一些关键的节点:
public interface Event {/*** 新增一个义务*/void addJob();/** 失掉运转中的义务数量* @return 失掉运转中的义务数量*/TestCaseRuntimeResponse getRuntime();/*** 单个义务口头终了** @param jobName义务称号* @param finishCost 义务成功耗时*/void finishOne(String jobName, String finishCost);/**单个义务口头意外* @param jobDefine 义务* @param e 意外*/void oneException(AbstractJobDefine jobDefine, Exception e);/*** 一切义务口头终了*/void finishAll();}
其中getRuntime接口是用于在 cli 那边查问义务能否口头终了的接口,只要义务口头终了之后才干分开cli。
监控目的
当这些义务运转终了后咱们须要重点检查运行客户端和 Pulsar broker 端能否无心外日志。
同时还须要观察一些关键的监控面板:
蕴含但不限于:
当然还有zookeeper的运转状况也须要监控,限于篇幅就不逐一粘贴了。
以上就是测试整个 Pulsar 集群的流程,当然还有一些须要优化的中央。
比如经常使用命令行还是有些不便,后续或许会切换到网页上就可以操作。