Yarn的服务化框架分析-ResourceManager主服务简化示例

2023-09-18 21:04:05

Yarn的服务化框架分析

服务化指什么

将yarn中众多的功能拆分为众多的服务,按照定义好的服务框架(规范)进行组织管理。通过Service框架进行服务的管理,通过抽象的Service接口表示抽象的服务。

服务化有什么

Yarn提供的每个服务都有一种状态(STATE),可对服务进行初始化/启动/停止操作。

服务的状态

  • NOTINITED,其值为0,代表服务尚未初始化
  • INITED,其值为1,代表服务已经完成初始化
  • STARTED,其值为2,代表服务已经启动
  • STOPPED,其值为3,代表服务已经停止

服务状态的转换方法

  • init方法的目标是使服务从NOINITED -> INITED进行状态转换

  • start方法的目标是使服务从INITED -> STARTED进行状态转换

  • stop方法的目标是使服务从STARTED->STOPPED进行状态转换

服务的状态不是随意转换的,需要满足转换要求,通过ServiceStateModel中的二维数组statemap来巧妙控制状态的合理转换,

 private static final boolean[][] statemap =
    {
      //                uninited inited started stopped
      /* uninited  */    {false, true,  false,  true},
      /* inited    */    {false, true,  true,   true},
      /* started   */    {false, false, true,   true},
      /* stopped   */    {false, false, false,  true},
    };

横轴代表旧状态,竖轴代表新状态,其数组的值代表是否可以从此旧状态转换到新状态。

服务框架的实现

服务化采用了模板方面模式,将服务状态变化的相关方法抽象到了抽象类AbstractService中,具体服务的开发的时候,只需要关注具体逻辑,不需要重复写状态转换的代码了。AbstractService类只能表示一个独立的Service,并不能表示所有服务。为了满足某一个服务中包含其他子服务管理的场景,提供了CompositeService类,来管理所有子服务服务群组的状态转换方法。

ResourceManager为例简化的类图如下:

实验

入口方法是ResourceManager类的main方法,

执行结果

ResourceManager State is changed to INITED
ResourceManager serviceStart
Adding service AdminService
Adding service Dispatcher
Adding service RMApplicationHistoryWriter
ResourceManager: initing services, size=3
AdminService State is changed to INITED
AdminService serviceInit
AdminService: initing services, size=0
AbstractService serviceInit
AdminService 初始化状态成功,做些其他关联的事情
Dispatcher State is changed to INITED
AsyncDispatcher serviceInit
AbstractService serviceInit
Dispatcher 初始化状态成功,做些其他关联的事情
RMApplicationHistoryWriter State is changed to INITED
RMApplicationHistoryWriter serviceInit
RMApplicationHistoryWriter: initing services, size=0
AbstractService serviceInit
RMApplicationHistoryWriter 初始化状态成功,做些其他关联的事情
AbstractService serviceInit
ResourceManager 初始化状态成功,做些其他关联的事情
ResourceManager serviceStart
ResourceManager: starting services, size=3
AdminService serviceStart
AdminService: starting services, size=0
AbstractService serviceStart
AdminService 服务启动成功,做些其他关联的事情
AsyncDispatcher serviceStart
AbstractService serviceStart
Dispatcher 服务启动成功,做些其他关联的事情
RMApplicationHistoryWriter serviceStart
RMApplicationHistoryWriter: starting services, size=0
AbstractService serviceStart
RMApplicationHistoryWriter 服务启动成功,做些其他关联的事情
AbstractService serviceStart
ResourceManager 服务启动成功,做些其他关联的事情

伪装常驻5s等待关闭
ResourceManager State is changed to STOPPED
ResourceManager serviceStop
ResourceManager: stopping services, size=3
Stopping service #2: com.donny.service.RMApplicationHistoryWriter@330bedb4
RMApplicationHistoryWriter State is changed to STOPPED
RMApplicationHistoryWriter serviceStop
RMApplicationHistoryWriter: stopping services, size=0
AbstractService serviceStop
RMApplicationHistoryWriter report stop, notify anything listening for events
Stopping service #1: com.donny.service.AsyncDispatcher@2503dbd3
Dispatcher State is changed to STOPPED
AsyncDispatcher serviceStop
AbstractService serviceStop
Dispatcher report stop, notify anything listening for events
Stopping service #0: com.donny.service.AdminService@4b67cf4d
AdminService State is changed to STOPPED
AdminService serviceStop
AdminService: stopping services, size=0
AbstractService serviceStop
AdminService report stop, notify anything listening for events
AbstractService serviceStop
ResourceManager report stop, notify anything listening for events

通过日志可以发现,以RM为主服务时,在初始化和启动子服务的时候,是按照子服务的添加到服务群组的顺序启动的,而关闭服务的时候按其子服务添加顺序的倒序进行关闭的。由RM触发服务群组的初始化、启动和关闭,当整个服务群组完成状态转换后,才是RM整个主服务状态转换完成。

具体代码

service类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public interface Service {
    boolean isInState(STATE expected);

    STATE getServiceState();

    void init(Configuration config);

    void start();

    void stop();

    String getName();
}
STATE类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public enum STATE {

    NOTINITED(0, "NOTINITED"),

    INITED(1, "INITED"),

    STARTED(2, "STARTED"),

    STOPPED(3, "STOPPED");

    private final int value;

    private final String statename;

    private STATE(int value, String name) {
        this.value = value;
        this.statename = name;
    }

    public int getValue() {
        return value;
    }

    @Override
    public String toString() {
        return statename;
    }
}
ServiceStateModel类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class ServiceStateModel {


    private static final boolean[][] statemap =
            {
                    //                uninited inited started stopped
                    /* uninited  */    {false, true, false, true},
                    /* inited    */    {false, true, true, true},
                    /* started   */    {false, false, true, true},
                    /* stopped   */    {false, false, false, true},
            };

    private volatile STATE state;

    private String name;

    public ServiceStateModel(String name) {
        this(name, STATE.NOTINITED);
    }

    public ServiceStateModel(String name, STATE state) {
        this.state = state;
        this.name = name;
    }

    public STATE getState() {
        return state;
    }

    public boolean isInState(STATE proposed) {
        return state.equals(proposed);
    }

    public void ensureCurrentState(STATE expectedState) {
        if (state != expectedState) {
            throw new ServiceStateException(name + ": for this operation, the " +
                    "current service state must be "
                    + expectedState
                    + " instead of " + state);
        }
    }

    public synchronized STATE enterState(STATE proposed) {
        checkStateTransition(name, state, proposed);
        STATE oldState = state;
        //atomic write of the new state
        state = proposed;
        return oldState;
    }

    public static void checkStateTransition(String name,
                                            STATE state,
                                            STATE proposed) {
        if (!isValidStateTransition(state, proposed)) {
            throw new ServiceStateException(name + " cannot enter state "
                    + proposed + " from state " + state);
        }
    }

    public static boolean isValidStateTransition(STATE current,
                                                 STATE proposed) {
        boolean[] row = statemap[current.getValue()];
        return row[proposed.getValue()];
    }

    @Override
    public String toString() {
        return (name.isEmpty() ? "" : ((name) + ": "))
                + state.toString();
    }

}
AbstractService类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public abstract class AbstractService implements Service {
    private final String name;
    private final ServiceStateModel stateModel;
    private final Object stateChangeLock = new Object();
    private volatile Configuration config;

    @Override
    public String getName() {
        return name;
    }

    public AbstractService(String name) {
        this.name = name;
        stateModel = new ServiceStateModel(name);
    }

    protected void setConfig(Configuration conf) {
        this.config = conf;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        if (conf != config) {
            System.out.println("更新配置");
            setConfig(conf);
        }
        System.out.println("AbstractService serviceInit");
    }

    @Override
    public final boolean isInState(STATE expected) {
        return stateModel.isInState(expected);
    }

    @Override
    public final STATE getServiceState() {
        return stateModel.getState();
    }

    private STATE enterState(STATE newState) {
        assert stateModel != null : "null state in " + name + " " + this.getClass();
        STATE oldState = stateModel.enterState(newState);
        if (oldState != newState) {
            // do something,eg:log
            System.out.println(getName() + " State is changed to " + newState.name());
        }
        return oldState;
    }


    @Override
    public void init(Configuration conf) {
        if (conf == null) {
            throw new ServiceStateException("Cannot initialize service "
                    + getName() + ": null configuration");
        }
        if (isInState(STATE.INITED)) {
            return;
        }
        synchronized (stateChangeLock) {
            if (enterState(STATE.INITED) != STATE.INITED) {
                setConfig(conf);
                try {
                    serviceInit(config);
                    if (isInState(STATE.INITED)) {
                        //if the service ended up here during init,
                        //notify the listeners
                        System.out.println(getName() + " 初始化状态成功,做些其他关联的事情");
                    }
                } catch (Exception e) {
                    // 做一些失败补偿操作
                    System.out.println("init is Exception.");
                    throw new ServiceStateException(e);
                }
            }
        }
    }

    @Override
    public void start() {
        if (isInState(STATE.STARTED)) {
            return;
        }
        //enter the started state
        synchronized (stateChangeLock) {
            if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
                try {
                    serviceStart();
                    System.out.println(getName() + " 服务启动成功,做些其他关联的事情");

                } catch (Exception e) {
                    // 做一些失败补偿操作
                    System.out.println("start is Exception.");
                    throw new ServiceStateException(e);
                }
            }
        }
    }

    protected void serviceStart() throws Exception {
        System.out.println("AbstractService serviceStart");
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void stop() {
        if (isInState(STATE.STOPPED)) {
            return;
        }
        synchronized (stateChangeLock) {
            if (enterState(STATE.STOPPED) != STATE.STOPPED) {
                try {
                    serviceStop();
                } catch (Exception e) {
                    // 做一些失败补偿操作
                    System.out.println(getName() + " stop is Exception.");
                    throw new ServiceStateException(e);
                } finally {
                    //report that the service has terminated
                    //notify anything listening for events
                    System.out.println(getName() + " report stop, notify anything listening for events");
                }
            } else {
                System.out.println("服务已经是停止状态.");
            }
        }
    }

    protected void serviceStop() throws Exception {
        System.out.println("AbstractService serviceStop");
    }
}
CompositeService类
package com.donny.service;

import java.util.ArrayList;
import java.util.List;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class CompositeService extends AbstractService {
    protected static final boolean STOP_ONLY_STARTED_SERVICES = false;
    private final List<Service> serviceList = new ArrayList<Service>();

    public CompositeService(String name) {
        super(name);
    }

    public List<Service> getServices() {
        synchronized (serviceList) {
            return new ArrayList<Service>(serviceList);
        }
    }

    protected void addService(Service service) {
        System.out.println("Adding service " + service.getName());
        synchronized (serviceList) {
            serviceList.add(service);
        }
    }

    protected boolean addIfService(Object object) {
        if (object instanceof Service) {
            addService((Service) object);
            return true;
        } else {
            return false;
        }
    }

    protected synchronized boolean removeService(Service service) {
        synchronized (serviceList) {
            return serviceList.remove(service);
        }
    }

    protected void serviceInit(Configuration conf) throws Exception {
        List<Service> services = getServices();
        System.out.println(getName() + ": initing services, size=" + services.size());
        for (Service service : services) {
            service.init(conf);
        }
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        List<Service> services = getServices();
        System.out.println(getName() + ": starting services, size=" + services.size());
        for (Service service : services) {
            // start the service. If this fails that service
            // will be stopped and an exception raised
            service.start();
        }
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        //stop all services that were started
        int numOfServicesToStop = serviceList.size();
        System.out.println(getName() + ": stopping services, size=" + numOfServicesToStop);
        stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES);
        super.serviceStop();
    }

    private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices) {
        // stop in reverse order of start
        Exception firstException = null;
        List<Service> services = getServices();
        for (int i = numOfServicesStarted - 1; i >= 0; i--) {
            Service service = services.get(i);

            System.out.println("Stopping service #" + i + ": " + service);

            STATE state = service.getServiceState();
            //depending on the stop police
            if (state == STATE.STARTED
                    || (!stopOnlyStartedServices && state == STATE.INITED)) {
                Exception ex = ServiceOperations.stopQuietly(service);
                if (ex != null && firstException == null) {
                    firstException = ex;
                }
            }
        }
        //after stopping all services, rethrow the first exception raised
        if (firstException != null) {
            throw new ServiceStateException(firstException);
        }
    }

}
ServiceStateException类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class ServiceStateException extends RuntimeException {
    public ServiceStateException() {
        super();
    }

    public ServiceStateException(String msg) {
        super(msg);
    }

    public ServiceStateException(String msg, Throwable cause) {
        super(msg, cause);
    }

    public ServiceStateException(Throwable cause) {
        super(cause);
    }
}
ServiceOperations类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class ServiceOperations {
    private ServiceOperations() {
    }

    public static void stop(Service service) {
        if (service != null) {
            service.stop();
        }
    }

    public static Exception stopQuietly(Service service) {
        try {
            stop(service);
        } catch (Exception e) {
            System.out.println("When stopping the service " + service.getName()
                    + " : " + e);
            return e;
        }
        return null;
    }
}
AdminService类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class AdminService extends CompositeService {
    public AdminService() {
        super("AdminService");
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        System.out.println("AdminService serviceInit");
        super.serviceInit(conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        System.out.println("AdminService serviceStart");
        super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
        System.out.println("AdminService serviceStop");
        super.serviceStop();
    }
}
AsyncDispatcher类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class AsyncDispatcher extends AbstractService {
    public AsyncDispatcher() {
        super("Dispatcher");
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        System.out.println("AsyncDispatcher serviceInit");
        super.serviceInit(conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        //start all the components
        System.out.println("AsyncDispatcher serviceStart");
        super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
        System.out.println("AsyncDispatcher serviceStop");
        super.serviceStop();
    }
}
RMApplicationHistoryWriter类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class RMApplicationHistoryWriter extends CompositeService {
    public RMApplicationHistoryWriter() {
        super("RMApplicationHistoryWriter");
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        System.out.println("RMApplicationHistoryWriter serviceInit");
        super.serviceInit(conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        System.out.println("RMApplicationHistoryWriter serviceStart");
        super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
        System.out.println("RMApplicationHistoryWriter serviceStop");
        super.serviceStop();
    }
}
Configuration类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class Configuration {
    public String getCustomConfig() {
        return customConfig;
    }

    public void setCustomConfig(String customConfig) {
        this.customConfig = customConfig;
    }

    private String customConfig;

}
ResourceManager类
package com.donny.service;

/**
 * @author 1792998761@qq.com
 * @description
 * @date 2023/9/18
 */
public class ResourceManager extends CompositeService {
    private Configuration conf;
    protected AdminService adminService;
    protected AsyncDispatcher asyncDispatcher;
    protected RMApplicationHistoryWriter rmApplicationHistoryWriter;

    public ResourceManager() {
        super("ResourceManager");
    }

    protected void serviceInit(Configuration conf) throws Exception {
        System.out.println("ResourceManager serviceStart");
        this.conf = conf;
        adminService = new AdminService();
        asyncDispatcher = new AsyncDispatcher();
        rmApplicationHistoryWriter = new RMApplicationHistoryWriter();
        addService(adminService);
        addIfService(asyncDispatcher);
        addService(rmApplicationHistoryWriter);

        super.serviceInit(this.conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        //start all the components
        System.out.println("ResourceManager serviceStart");
        super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
        System.out.println("ResourceManager serviceStop");
        super.serviceStop();
    }

    public static void main(String argv[]) throws Exception {
        ResourceManager resourceManager = new ResourceManager();
        Configuration configuration = new Configuration();
        configuration.setCustomConfig("自定义配置");
        resourceManager.setConfig(configuration);

        resourceManager.init(configuration);
        resourceManager.start();
        System.out.println("伪装常驻5s等待关闭");
        Thread.sleep(5000);
        resourceManager.stop();
    }
}
更多推荐

zabbix

zabbix是一个基于Web界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。zabbix能监视各种网络参数,保证服务器系统的安全运营;并提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。zabbix由2部分构成,zabbixserver与可选组件zabbixagent。通过C/S模式采集数据

sqlite条件查询语句where

文章目录导入CSV数据where其他运算符sqlite初步导入CSV数据查询是数据库中最频繁的操作,但学习查询指令,最起码要有可供查询的表格。比如现在有下面这些2022排名前20的国家GDP的数据,当然格式是csv的,我们要做的第一步就是将其转为sqilte格式。1,美国,美洲,254600,0.2532042,中国,

Excel_字母数字混合排序(数字不符合预期)的一种解决方法

引ADC_DCAL_DN1[13:0]ADC_DCAL_DN10[13:0]ADC_DCAL_DN11[13:0]ADC_DCAL_DN2[13:0]ADC_DCAL_DN3[13:0]ADC1_EOCADC10_EOCADC11_EOCADC2_EOCADC3_EOCADC4_EOCADC5_EOCADC_CALCB

酷开科技夯实流量基础,构建智慧生活新风尚!

在这个日新月异的时代,智能化趋势加速发展,依托于互联网服务的OTT也越来越贴近消费者的居家生活,并在家用场景下释放出独特的大屏营销价值。成立于2006年的酷开科技,一直是智能电视行业前进道路上的坚实力量,自创建以来便一直为互联网智能电视行业提供技术与运维服务。酷开科技以内容技术服务和数字营销服务为核心,自主研发了一套智

Leetcode.213 打家劫舍 II

题目链接Leetcode.213打家劫舍IImid题目描述你是一个专业的小偷,计划偷窃沿街的房屋,每间房内都藏有一定的现金。这个地方所有的房屋都围成一圈,这意味着第一个房屋和最后一个房屋是紧挨着的。同时,相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被小偷闯入,系统会自动报警。给定一个代表每个房屋存放金

Golang 的 GMP:并发编程的艺术

前言在Golang的并发编程中,GMP是一个重要的概念,它代表了Goroutine、M(线程)和P(调度器)。这个强大的三位一体的并发模型使得Golang在处理并发任务时非常高效和灵活。通过GMP的组合,Golang实现了一种高效的并发模型。它充分利用了多核处理器的优势,并通过轻量级的Goroutine实现了高并发的编

typescript 高级类型-class类详解

class简介typescript全面支持es2015中引入的class关键字,并为其添加了类型注解,和其它语法(比如,可见性修饰符等),class基本使用,如下tips1.根据ts中的类型推论,可以知道Person的实例对象p的类型是Person2.ts中的class,不仅提供了class的语法功能,也作为一种类型存

坚鹏:浙江农商联合银行同业核心产品解读与差异化分析培训第8期

浙江农商联合银行同业核心产品解读与差异化分析培训第8期1952年,浙江第一家农村信用社成立。2004年4月18日,浙江省农信联社成立,承担对全省农信社的管理、指导、协调和服务职能。2021年10月,经国务院批准同意、银保监会批复,浙江成为全国深化农信社改革“第一单”。2022年4月18日,省委书记袁家军,省委副书记、省

华清 Qt day1 9月15

.pro:QT+=coregui#引入QT所需要的核心库core,gui为图形开发相关类库greaterThan(QT_MAJOR_VERSION,4):QT+=widgets#表示如果超过4.0版本会自动加上widgets类库CONFIG+=c++11#表示支持C++11后的版本#Thefollowingdefine

SMS--短信服务

1短信服务介绍短信服务(ShortMessageService)是阿里云为用户提供的一种通信服务的能力。2短信服务使用接下来,我们使用短信验证码功能来演示短信服务的使用。流程如下:2.1准备工作2.1.1实名认证https://help.aliyun.com/document_detail/48263.html?spm

Algorithm基础算法学习

算法学习贪心算法贪心算法简介:这种算法模式一般将求解过程分成若干个步骤,但每个步骤都应用贪心原则,选取当前状态下最好/最优的选择(局部最有利的选择),并以此希望最后堆叠出的结果也是最好/最优的解。这就好像一个贪婪的人,他事事都想要眼前看到最好的那个,看不到长远的东西,也不为最终的结果和将来着想,贪图眼前局部的利益最大化

热文推荐