我们将从源码的角度查看Neutron Server,使用的版本是Neutron-10.0.0。因为python代码不熟,所以参照了这篇文章

Neutron Server 能力


由上一篇得知,Neutron Server向上提供了API,向下调用Plugin,我们就依次查看一下

  • 入口函数,主要是Neutron Server从哪里调入的。
  • Core API,对外提供管理Network,Subnet和Port的RESTful的API。
  • Extension API,对外提供管理Router, Load Balance, Firewall等资源的RESTful API。
  • Common Service,认证和校验API的请求,这块我们先不关注
  • Neutron Core,Neutron Server的核心处理程序,通过调用相应的Plugin处理请求。这里因为和其他部分结合比较紧密,就不单独拿出来看了。
  • Core Plugin API,定义了Core Plgin的抽象功能集合,Neutron Core通过该API调用相应的Core Plugin。
  • Extension Plugin API, 定义了Service Plugin的抽象功能集合,Neutron Core通过该API调用相应的Service Plugin。
  • Core Plugin,实现了Core Plugin API,在数据库中维护network, Subnet和Port的状态,并负责调用相应的Agent在Network Provider上执行相关操作,比如创建Network。这块我们单独后续单独看。
  • Service Plugin,实现了Extension Plugin API,在数据库中维护Router, Load Balance, Security Group等资源的状态,并负责调用相应的Agent在Network Provider上执行相关操作,比如创建Router。这块我们后续也单独看。

入口函数


从setup.cfg看到入口函数位于neutron.cmd.eventlet.server:main,位于neutron/cmd/server/__init__.py

main-->server.boot_server-->_main_neutron_server-->wsgi_eventlet.eventlet_wsgi_server

# neutron/server/wsgi_eventlet.py
def eventlet_wsgi_server():
    # 注册neutron-api,并且启动相应的进程,并且进行监听
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    # 利用eventlet线程池启动Plugin和RPC
    start_api_and_rpc_workers(neutron_api)
# neutron/service.py
def serve_wsgi(cls):
    # 创建一个class NeutronApiService实例
    # app_name = 'neutron'
    service = cls.create()
    # 启动API Server
    # 此处继承了class NeutronApiService的父类class WsgiService
    service.start()
    registry.notify(resources.PROCESS, events.BEFORE_SPAWN, service)
    return service

class WsgiService(object):
    def start(self):
        # 调用_run_wsgi加载
        self.wsgi_app = _run_wsgi(self.app_name)

def _run_wsgi(app_name):
    # 先从配置文件api-paste.ini读取出要加载的app
    # 默认配置的app如下,下面会稍微介绍一下为何
    # paste.app_factory =  neutron.api.v2.router:APIRouter.factory
    # 所以这一步实际调用的是 neutron.api.v2.router ApiRouter实例
    # 的factory生成一个router实例
    # 最终创建一些controller的URL
    app = config.load_paste_app(app_name)
    # 创建neutron的协程
    # 监听host的9696端口
    # 创建n个api进程,取决于配置文件api_workers
    return run_wsgi_app(app)
# neutron/server/wsgi_eventlet.py
def start_api_and_rpc_workers(neutron_api):
    # 可能是为plugin和RPC创建进程
    # plugin的worker数量取决于配置文件的service_plugins
    # RPC的数量取决于配置文件的rpc_workers
    worker_launcher = service.start_all_workers()

    # 创建协程坐等两个服务结束
    pool = eventlet.GreenPool()
    api_thread = pool.spawn(neutron_api.wait)
    plugin_workers_thread = pool.spawn(worker_launcher.wait)

    # 确保其中一个服务挂掉后,结束另外一个
    api_thread.link(lambda gt: plugin_workers_thread.kill())
    plugin_workers_thread.link(lambda gt: api_thread.kill())

    pool.waitall()

Core API & Extension API


Core API主要对外提供管理Network, Subnet和Port等的RESTful API。
Extension API对外提供管理Router, Load Balance, Firewall等资源的RESTful API。
我们可以通过命令行查看API。

curl http://localhost:9696/v2.0/

{"resources": [
   {"links": [{"href": "http://localhost:9696/v2.0/subnets", "rel": "self"}],
       "name": "subnet", "collection": "subnets"},
   {"links": [{"href": "http://localhost:9696/v2.0/subnetpools", "rel": "self"}],
       "name": "subnetpool", "collection": "subnetpools"},
   {"links": [{"href": "http://localhost:9696/v2.0/networks", "rel": "self"}],
       "name": "network", "collection": "networks"},
   {"links": [{"href": "http://localhost:9696/v2.0/ports", "rel": "self"}],
       "name": "port", "collection": "ports"}]}

curl http://localhost:9696/v2.0/extensions

{"extensions": [
    {"alias": "default-subnetpools", "updated": "2016-02-18T18:00:00-00:00", "name": "Default Subnetpools", "links": [], "description": "Provides ability to mark and use a subnetpool as the default"},
    {"alias": "network-ip-availability", "updated": "2015-09-24T00:00:00-00:00", "name": "Network IP Availability", "links": [], "description": "Provides IP availability data for each network and subnet."},
    {"alias": "network_availability_zone", "updated": "2015-01-01T10:00:00-00:00", "name": "Network Availability Zone", "links": [], "description": "Availability zone support for network."},
    {"alias": "auto-allocated-topology", "updated": "2016-01-01T00:00:00-00:00", "name": "Auto Allocated Topology Services", "links": [], "description": "Auto Allocated Topology Services."},
    {"alias": "ext-gw-mode", "updated": "2013-03-28T10:00:00-00:00", "name": "Neutron L3 Configurable external gateway mode", "links": [], "description": "Extension of the router abstraction for specifying whether SNAT should occur on the external gateway"},
    {"alias": "binding", "updated": "2014-02-03T10:00:00-00:00", "name": "Port Binding", "links": [], "description": "Expose port bindings of a virtual port to external application"},
    {"alias": "metering", "updated": "2013-06-12T10:00:00-00:00", "name": "Neutron Metering", "links": [], "description": "Neutron Metering extension."},
    {"alias": "agent", "updated": "2013-02-03T10:00:00-00:00", "name": "agent", "links": [], "description": "The agent management extension."},
    {"alias": "subnet_allocation", "updated": "2015-03-30T10:00:00-00:00", "name": "Subnet Allocation", "links": [], "description": "Enables allocation of subnets from a subnet pool"},
    {"alias": "l3_agent_scheduler", "updated": "2013-02-07T10:00:00-00:00", "name": "L3 Agent Scheduler", "links": [], "description": "Schedule routers among l3 agents"},
    {"alias": "tag", "updated": "2016-01-01T00:00:00-00:00", "name": "Tag support", "links": [], "description": "Enables to set tag on resources."},
    {"alias": "external-net", "updated": "2013-01-14T10:00:00-00:00", "name": "Neutron external network", "links": [], "description": "Adds external network attribute to network resource."},
    {"alias": "flavors", "updated": "2015-09-17T10:00:00-00:00", "name": "Neutron Service Flavors", "links": [], "description": "Flavor specification for Neutron advanced services"},
    {"alias": "net-mtu", "updated": "2015-03-25T10:00:00-00:00", "name": "Network MTU", "links": [], "description": "Provides MTU attribute for a network resource."},
    {"alias": "availability_zone", "updated": "2015-01-01T10:00:00-00:00", "name": "Availability Zone", "links": [], "description": "The availability zone extension."},
    {"alias": "quotas", "updated": "2012-07-29T10:00:00-00:00", "name": "Quota management support", "links": [], "description": "Expose functions for quotas management per tenant"},
    {"alias": "l3-ha", "updated": "2014-04-26T00:00:00-00:00", "name": "HA Router extension", "links": [], "description": "Add HA capability to routers."},
    {"alias": "provider", "updated": "2012-09-07T10:00:00-00:00", "name": "Provider Network", "links": [], "description": "Expose mapping of virtual networks to physical networks"},
    {"alias": "multi-provider", "updated": "2013-06-27T10:00:00-00:00", "name": "Multi Provider Network", "links": [], "description": "Expose mapping of virtual networks to multiple physical networks"},
    {"alias": "address-scope", "updated": "2015-07-26T10:00:00-00:00", "name": "Address scope", "links": [], "description": "Address scopes extension."},
    {"alias": "extraroute", "updated": "2013-02-01T10:00:00-00:00", "name": "Neutron Extra Route", "links": [], "description": "Extra routes configuration for L3 router"},
    {"alias": "subnet-service-types", "updated": "2016-03-15T18:00:00-00:00", "name": "Subnet service types", "links": [], "description": "Provides ability to set the subnet service_types field"},
    {"alias": "standard-attr-timestamp", "updated": "2016-09-12T10:00:00-00:00", "name": "Resource timestamps", "links": [], "description": "Adds created_at and updated_at fields to all Neutron resources that have Neutron standard attributes."},
    {"alias": "service-type", "updated": "2013-01-20T00:00:00-00:00", "name": "Neutron Service Type Management", "links": [], "description": "API for retrieving service providers for Neutron advanced services"},
    {"alias": "l3-flavors", "updated": "2016-05-17T00:00:00-00:00", "name": "Router Flavor Extension", "links": [], "description": "Flavor support for routers."},
    {"alias": "port-security", "updated": "2012-07-23T10:00:00-00:00", "name": "Port Security", "links": [], "description": "Provides port security"},
    {"alias": "extra_dhcp_opt", "updated": "2013-03-17T12:00:00-00:00", "name": "Neutron Extra DHCP opts", "links": [], "description": "Extra options configuration for DHCP. For example PXE boot options to DHCP clients can be specified (e.g. tftp-server, server-ip-address, bootfile-name)"},
    {"alias": "standard-attr-revisions", "updated": "2016-04-11T10:00:00-00:00", "name": "Resource revision numbers", "links": [], "description": "This extension will display the revision number of neutron resources."},
    {"alias": "pagination", "updated": "2016-06-12T00:00:00-00:00", "name": "Pagination support", "links": [], "description": "Extension that indicates that pagination is enabled."},
    {"alias": "sorting", "updated": "2016-06-12T00:00:00-00:00", "name": "Sorting support", "links": [], "description": "Extension that indicates that sorting is enabled."},
    {"alias": "security-group", "updated": "2012-10-05T10:00:00-00:00", "name": "security-group", "links": [], "description": "The security groups extension."},
    {"alias": "dhcp_agent_scheduler", "updated": "2013-02-07T10:00:00-00:00", "name": "DHCP Agent Scheduler", "links": [], "description": "Schedule networks among dhcp agents"},
    {"alias": "router_availability_zone", "updated": "2015-01-01T10:00:00-00:00", "name": "Router Availability Zone", "links": [], "description": "Availability zone support for router."},
    {"alias": "rbac-policies", "updated": "2015-06-17T12:15:12-00:00", "name": "RBAC Policies", "links": [], "description": "Allows creation and modification of policies that control tenant access to resources."},
    {"alias": "tag-ext", "updated": "2017-01-01T00:00:00-00:00", "name": "Tag support for resources: subnet, subnetpool, port, router", "links": [], "description": "Extends tag support to more L2 and L3 resources."},
    {"alias": "standard-attr-description", "updated": "2016-02-10T10:00:00-00:00", "name": "standard-attr-description", "links": [], "description": "Extension to add descriptions to standard attributes"},
    {"alias": "router", "updated": "2012-07-20T10:00:00-00:00", "name": "Neutron L3 Router", "links": [], "description": "Router abstraction for basic L3 forwarding between L2 Neutron networks and access to external networks via a NAT gateway."},
    {"alias": "allowed-address-pairs", "updated": "2013-07-23T10:00:00-00:00", "name": "Allowed Address Pairs", "links": [], "description": "Provides allowed address pairs"},
    {"alias": "project-id", "updated": "2016-09-09T09:09:09-09:09", "name": "project_id field enabled", "links": [], "description": "Extension that indicates that project_id field is enabled."},
    {"alias": "dvr", "updated": "2014-06-1T10:00:00-00:00", "name": "Distributed Virtual Router", "links": [], "description": "Enables configuration of Distributed Virtual Routers."}]}

接下来我们看一下代码,从上面可以看到_run_wsgi(app_name)主要负责API的服务。
我们分别看一下,首先是app = config.load_paste_app(app_name)的操作,主要是根据配置文件api-paste.ini获取需要加载的app,配置文件相关信息如下:

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions_composite
/v2.0: neutronapi_v2_0

# neutron/api/versions.py中class Versions中是v2.0
[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

从配置文件知道我们需要加载的app是v2版本的API创建,我们从下面的代码可以得知,我们这个应用是来创建API的,只不过这里只是生成了API的URL和路由,创建的信息如上面查看的API所示,但是并没有进行正常的监听。

# neutron/api/v2/router.py
class APIRouter(base_wsgi.Router):
    # 方法构造了一个APIRouter对象作为app返回
    # 所以我们需要分析一下构造函数__init__
    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

    # Core API的构造,下面详细分析
    def __init__(self, **local_config):
        # 创建一个router,构造URL和controller的映射
        mapper = routes_mapper.Mapper()
        # class NeutronManager会创建一个core plugin
        # 我们的配置core_plugin = ml2
        # 并且加载service_plugins = router,metering,qos
        # service_plugins还要加上默认的plugins
        # 默认的在neutron/plugins/common/constants.py的
        # DEFAULT_SERVICE_PLUGINS
        manager.init()
        # 所以这里获取的plugin是ml2的plugin
        plugin = directory.get_plugin()
        # 从neutron/extensions目录下加载所有上面提到的service_plugins
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        # 扩展资源neutron/api/v2/attributes.py中的RESOURCE_ATTRIBUTE_MAP
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        # COLLECTION_ACTIONS = ['index', 'create']
        # MEMBER_ACTIONS = ['show', 'update', 'delete']
        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=True,
                allow_sorting=True)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        # 构造根URL对应的信息
        # RESOURCES = {'network': 'networks', 'subnet': 'subnets',
        #              'subnetpool': 'subnetpools', 'port': 'ports'}
        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            # 分别构造上面的几个URL,信息来自于上面的RESOURCE_ATTRIBUTE_MAP信息
            # 其中一个查看 curl http://localhost:9696/v2.0/subnets
            # {"subnets": []}
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))
            # 注册事件,监听数据库的变动
            resource_registry.register_resource_by_name(resource)

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        policy.reset()
        # 初始化路由请求
        super(APIRouter, self).__init__(mapper)

接下来我们需要看一下run_wsgi_app(app)的操作,主要是根据配置创建n个API Worker的进程,然后进行监听。

# neutron/service.py
def run_wsgi_app(app):
    # 构造一个server如下,主要创建一个neutron协程
    server = wsgi.Server("Neutron")
    # 进行端口监听,响应API的请求
    # 因为可以创建多个api worker,目前没细看怎么调度
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=_get_api_workers())
    return server

# neutron/wsgi.py
class Server(object):
    def __init__(self, name, num_threads=None, disable_ssl=False):
        eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
        self.num_threads = num_threads or CONF.wsgi_default_pool_size
        self.disable_ssl = disable_ssl
        # 创建协程
        self.pool = eventlet.GreenPool(1)
        self.name = name
        self._server = None
        self.client_socket_timeout = CONF.client_socket_timeout or None
        if CONF.use_ssl and not self.disable_ssl:
            sslutils.is_enabled(CONF)

    def start(self, application, port, host='0.0.0.0', workers=0):
        self._host = host
        self._port = port
        backlog = CONF.backlog

        # 创建监听,host:port(配置文件bind_host和bind_port)
        # 以及一些特性的配置
        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)
        self._launch(application, workers)

    def _get_socket(self, host, port, backlog):
        bind_addr = (host, port)
        # 获取网络信息
        info = socket.getaddrinfo(bind_addr[0],
                                  bind_addr[1],
                                  socket.AF_UNSPEC,
                                  socket.SOCK_STREAM)[0]
        family = info[0]
        bind_addr = info[-1]

        sock = None
        # 创建监听
        sock = eventlet.listen(bind_addr,
                               backlog=backlog,
                               family=family)
        # 允许重用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 支持keepalive
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

        # 配置idle time
        if hasattr(socket, 'TCP_KEEPIDLE'):
            sock.setsockopt(socket.IPPROTO_TCP,
                            socket.TCP_KEEPIDLE,
                            CONF.tcp_keepidle)

        return sock

    def _launch(self, application, workers=0):
        service = WorkerService(self, application, self.disable_ssl, workers)
        api.context_manager.dispose_pool()
        # 创建管道
        self._server = common_service.ProcessLauncher(cfg.CONF,
                                                      wait_interval=1.0)
        # 启动n个api workers进程
        self._server.launch_service(service,
                                    workers=service.worker_process_count)

Core API & Extension API


未完待续。。。其实是因为没时间看这块了。。。

最后修改:2021 年 08 月 18 日
如果觉得我的文章对你有用,请随意赞赏