赵占旭的博客

Neutron Server 源码分析

我们将从源码的角度查看Neutron Server,使用的版本是Neutron-10.0.0。因为python代码不熟,所以参照了这篇文章

Neutron Server 能力


由上一篇得知,Neutron Server向上提供了API,向下调用Plugin,我们就依次查看一下

  • 入口函数,主要是Neutron Server从哪里调入的。
  • Core API,对外提供管理Network,Subnet和Port的RESTful的API。
  • Extension API,对外提供管理Router, Load Balance, Firewall等资源的RESTful API。
  • Common Service,认证和校验API的请求,这块我们先不关注
  • Neutron Core,Neutron Server的核心处理程序,通过调用相应的Plugin处理请求。这里因为和其他部分结合比较紧密,就不单独拿出来看了。
  • Core Plugin API,定义了Core Plgin的抽象功能集合,Neutron Core通过该API调用相应的Core Plugin。
  • Extension Plugin API, 定义了Service Plugin的抽象功能集合,Neutron Core通过该API调用相应的Service Plugin。
  • Core Plugin,实现了Core Plugin API,在数据库中维护network, Subnet和Port的状态,并负责调用相应的Agent在Network Provider上执行相关操作,比如创建Network。这块我们单独后续单独看。
  • Service Plugin,实现了Extension Plugin API,在数据库中维护Router, Load Balance, Security Group等资源的状态,并负责调用相应的Agent在Network Provider上执行相关操作,比如创建Router。这块我们后续也单独看。

入口函数


从setup.cfg看到入口函数位于neutron.cmd.eventlet.server:main,位于neutron/cmd/server/__init__.py

main-->server.boot_server-->_main_neutron_server-->wsgi_eventlet.eventlet_wsgi_server

1
2
3
4
5
6
# neutron/server/wsgi_eventlet.py
def eventlet_wsgi_server():
# 注册neutron-api,并且启动相应的进程,并且进行监听
neutron_api = service.serve_wsgi(service.NeutronApiService)
# 利用eventlet线程池启动Plugin和RPC
start_api_and_rpc_workers(neutron_api)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# neutron/service.py
def serve_wsgi(cls):
# 创建一个class NeutronApiService实例
# app_name = 'neutron'
service = cls.create()
# 启动API Server
# 此处继承了class NeutronApiService的父类class WsgiService
service.start()
registry.notify(resources.PROCESS, events.BEFORE_SPAWN, service)
return service

class WsgiService(object):
def start(self):
# 调用_run_wsgi加载
self.wsgi_app = _run_wsgi(self.app_name)

def _run_wsgi(app_name):
# 先从配置文件api-paste.ini读取出要加载的app
# 默认配置的app如下,下面会稍微介绍一下为何
# paste.app_factory = neutron.api.v2.router:APIRouter.factory
# 所以这一步实际调用的是 neutron.api.v2.router ApiRouter实例
# 的factory生成一个router实例
# 最终创建一些controller的URL
app = config.load_paste_app(app_name)
# 创建neutron的协程
# 监听host的9696端口
# 创建n个api进程,取决于配置文件api_workers
return run_wsgi_app(app)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# neutron/server/wsgi_eventlet.py
def start_api_and_rpc_workers(neutron_api):
# 可能是为plugin和RPC创建进程
# plugin的worker数量取决于配置文件的service_plugins
# RPC的数量取决于配置文件的rpc_workers
worker_launcher = service.start_all_workers()

# 创建协程坐等两个服务结束
pool = eventlet.GreenPool()
api_thread = pool.spawn(neutron_api.wait)
plugin_workers_thread = pool.spawn(worker_launcher.wait)

# 确保其中一个服务挂掉后,结束另外一个
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())

pool.waitall()

Core API & Extension API


Core API主要对外提供管理Network, Subnet和Port等的RESTful API。
Extension API对外提供管理Router, Load Balance, Firewall等资源的RESTful API。
我们可以通过命令行查看API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
curl http://localhost:9696/v2.0/

{"resources": [
{"links": [{"href": "http://localhost:9696/v2.0/subnets", "rel": "self"}],
"name": "subnet", "collection": "subnets"},
{"links": [{"href": "http://localhost:9696/v2.0/subnetpools", "rel": "self"}],
"name": "subnetpool", "collection": "subnetpools"},
{"links": [{"href": "http://localhost:9696/v2.0/networks", "rel": "self"}],
"name": "network", "collection": "networks"},
{"links": [{"href": "http://localhost:9696/v2.0/ports", "rel": "self"}],
"name": "port", "collection": "ports"}]}

curl http://localhost:9696/v2.0/extensions

{"extensions": [
{"alias": "default-subnetpools", "updated": "2016-02-18T18:00:00-00:00", "name": "Default Subnetpools", "links": [], "description": "Provides ability to mark and use a subnetpool as the default"},
{"alias": "network-ip-availability", "updated": "2015-09-24T00:00:00-00:00", "name": "Network IP Availability", "links": [], "description": "Provides IP availability data for each network and subnet."},
{"alias": "network_availability_zone", "updated": "2015-01-01T10:00:00-00:00", "name": "Network Availability Zone", "links": [], "description": "Availability zone support for network."},
{"alias": "auto-allocated-topology", "updated": "2016-01-01T00:00:00-00:00", "name": "Auto Allocated Topology Services", "links": [], "description": "Auto Allocated Topology Services."},
{"alias": "ext-gw-mode", "updated": "2013-03-28T10:00:00-00:00", "name": "Neutron L3 Configurable external gateway mode", "links": [], "description": "Extension of the router abstraction for specifying whether SNAT should occur on the external gateway"},
{"alias": "binding", "updated": "2014-02-03T10:00:00-00:00", "name": "Port Binding", "links": [], "description": "Expose port bindings of a virtual port to external application"},
{"alias": "metering", "updated": "2013-06-12T10:00:00-00:00", "name": "Neutron Metering", "links": [], "description": "Neutron Metering extension."},
{"alias": "agent", "updated": "2013-02-03T10:00:00-00:00", "name": "agent", "links": [], "description": "The agent management extension."},
{"alias": "subnet_allocation", "updated": "2015-03-30T10:00:00-00:00", "name": "Subnet Allocation", "links": [], "description": "Enables allocation of subnets from a subnet pool"},
{"alias": "l3_agent_scheduler", "updated": "2013-02-07T10:00:00-00:00", "name": "L3 Agent Scheduler", "links": [], "description": "Schedule routers among l3 agents"},
{"alias": "tag", "updated": "2016-01-01T00:00:00-00:00", "name": "Tag support", "links": [], "description": "Enables to set tag on resources."},
{"alias": "external-net", "updated": "2013-01-14T10:00:00-00:00", "name": "Neutron external network", "links": [], "description": "Adds external network attribute to network resource."},
{"alias": "flavors", "updated": "2015-09-17T10:00:00-00:00", "name": "Neutron Service Flavors", "links": [], "description": "Flavor specification for Neutron advanced services"},
{"alias": "net-mtu", "updated": "2015-03-25T10:00:00-00:00", "name": "Network MTU", "links": [], "description": "Provides MTU attribute for a network resource."},
{"alias": "availability_zone", "updated": "2015-01-01T10:00:00-00:00", "name": "Availability Zone", "links": [], "description": "The availability zone extension."},
{"alias": "quotas", "updated": "2012-07-29T10:00:00-00:00", "name": "Quota management support", "links": [], "description": "Expose functions for quotas management per tenant"},
{"alias": "l3-ha", "updated": "2014-04-26T00:00:00-00:00", "name": "HA Router extension", "links": [], "description": "Add HA capability to routers."},
{"alias": "provider", "updated": "2012-09-07T10:00:00-00:00", "name": "Provider Network", "links": [], "description": "Expose mapping of virtual networks to physical networks"},
{"alias": "multi-provider", "updated": "2013-06-27T10:00:00-00:00", "name": "Multi Provider Network", "links": [], "description": "Expose mapping of virtual networks to multiple physical networks"},
{"alias": "address-scope", "updated": "2015-07-26T10:00:00-00:00", "name": "Address scope", "links": [], "description": "Address scopes extension."},
{"alias": "extraroute", "updated": "2013-02-01T10:00:00-00:00", "name": "Neutron Extra Route", "links": [], "description": "Extra routes configuration for L3 router"},
{"alias": "subnet-service-types", "updated": "2016-03-15T18:00:00-00:00", "name": "Subnet service types", "links": [], "description": "Provides ability to set the subnet service_types field"},
{"alias": "standard-attr-timestamp", "updated": "2016-09-12T10:00:00-00:00", "name": "Resource timestamps", "links": [], "description": "Adds created_at and updated_at fields to all Neutron resources that have Neutron standard attributes."},
{"alias": "service-type", "updated": "2013-01-20T00:00:00-00:00", "name": "Neutron Service Type Management", "links": [], "description": "API for retrieving service providers for Neutron advanced services"},
{"alias": "l3-flavors", "updated": "2016-05-17T00:00:00-00:00", "name": "Router Flavor Extension", "links": [], "description": "Flavor support for routers."},
{"alias": "port-security", "updated": "2012-07-23T10:00:00-00:00", "name": "Port Security", "links": [], "description": "Provides port security"},
{"alias": "extra_dhcp_opt", "updated": "2013-03-17T12:00:00-00:00", "name": "Neutron Extra DHCP opts", "links": [], "description": "Extra options configuration for DHCP. For example PXE boot options to DHCP clients can be specified (e.g. tftp-server, server-ip-address, bootfile-name)"},
{"alias": "standard-attr-revisions", "updated": "2016-04-11T10:00:00-00:00", "name": "Resource revision numbers", "links": [], "description": "This extension will display the revision number of neutron resources."},
{"alias": "pagination", "updated": "2016-06-12T00:00:00-00:00", "name": "Pagination support", "links": [], "description": "Extension that indicates that pagination is enabled."},
{"alias": "sorting", "updated": "2016-06-12T00:00:00-00:00", "name": "Sorting support", "links": [], "description": "Extension that indicates that sorting is enabled."},
{"alias": "security-group", "updated": "2012-10-05T10:00:00-00:00", "name": "security-group", "links": [], "description": "The security groups extension."},
{"alias": "dhcp_agent_scheduler", "updated": "2013-02-07T10:00:00-00:00", "name": "DHCP Agent Scheduler", "links": [], "description": "Schedule networks among dhcp agents"},
{"alias": "router_availability_zone", "updated": "2015-01-01T10:00:00-00:00", "name": "Router Availability Zone", "links": [], "description": "Availability zone support for router."},
{"alias": "rbac-policies", "updated": "2015-06-17T12:15:12-00:00", "name": "RBAC Policies", "links": [], "description": "Allows creation and modification of policies that control tenant access to resources."},
{"alias": "tag-ext", "updated": "2017-01-01T00:00:00-00:00", "name": "Tag support for resources: subnet, subnetpool, port, router", "links": [], "description": "Extends tag support to more L2 and L3 resources."},
{"alias": "standard-attr-description", "updated": "2016-02-10T10:00:00-00:00", "name": "standard-attr-description", "links": [], "description": "Extension to add descriptions to standard attributes"},
{"alias": "router", "updated": "2012-07-20T10:00:00-00:00", "name": "Neutron L3 Router", "links": [], "description": "Router abstraction for basic L3 forwarding between L2 Neutron networks and access to external networks via a NAT gateway."},
{"alias": "allowed-address-pairs", "updated": "2013-07-23T10:00:00-00:00", "name": "Allowed Address Pairs", "links": [], "description": "Provides allowed address pairs"},
{"alias": "project-id", "updated": "2016-09-09T09:09:09-09:09", "name": "project_id field enabled", "links": [], "description": "Extension that indicates that project_id field is enabled."},
{"alias": "dvr", "updated": "2014-06-1T10:00:00-00:00", "name": "Distributed Virtual Router", "links": [], "description": "Enables configuration of Distributed Virtual Routers."}]}

接下来我们看一下代码,从上面可以看到_run_wsgi(app_name)主要负责API的服务。
我们分别看一下,首先是app = config.load_paste_app(app_name)的操作,主要是根据配置文件api-paste.ini获取需要加载的app,配置文件相关信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions_composite
/v2.0: neutronapi_v2_0

# neutron/api/versions.py中class Versions中是v2.0
[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

从配置文件知道我们需要加载的app是v2版本的API创建,我们从下面的代码可以得知,我们这个应用是来创建API的,只不过这里只是生成了API的URL和路由,创建的信息如上面查看的API所示,但是并没有进行正常的监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# neutron/api/v2/router.py
class APIRouter(base_wsgi.Router):
# 方法构造了一个APIRouter对象作为app返回
# 所以我们需要分析一下构造函数__init__
@classmethod
def factory(cls, global_config, **local_config):
return cls(**local_config)

# Core API的构造,下面详细分析
def __init__(self, **local_config):
# 创建一个router,构造URL和controller的映射
mapper = routes_mapper.Mapper()
# class NeutronManager会创建一个core plugin
# 我们的配置core_plugin = ml2
# 并且加载service_plugins = router,metering,qos
# service_plugins还要加上默认的plugins
# 默认的在neutron/plugins/common/constants.py的
# DEFAULT_SERVICE_PLUGINS
manager.init()
# 所以这里获取的plugin是ml2的plugin
plugin = directory.get_plugin()
# 从neutron/extensions目录下加载所有上面提到的service_plugins
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
# 扩展资源neutron/api/v2/attributes.py中的RESOURCE_ATTRIBUTE_MAP
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

# COLLECTION_ACTIONS = ['index', 'create']
# MEMBER_ACTIONS = ['show', 'update', 'delete']
col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
member_actions=MEMBER_ACTIONS)

def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
controller = base.create_resource(
collection, resource, plugin, params, allow_bulk=allow_bulk,
parent=parent, allow_pagination=True,
allow_sorting=True)
path_prefix = None
if parent:
path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
parent['member_name'],
collection)
mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS,
path_prefix=path_prefix,
**col_kwargs)
return mapper.collection(collection, resource,
**mapper_kwargs)

# 构造根URL对应的信息
# RESOURCES = {'network': 'networks', 'subnet': 'subnets',
# 'subnetpool': 'subnetpools', 'port': 'ports'}
mapper.connect('index', '/', controller=Index(RESOURCES))
for resource in RESOURCES:
# 分别构造上面的几个URL,信息来自于上面的RESOURCE_ATTRIBUTE_MAP信息
# 其中一个查看 curl http://localhost:9696/v2.0/subnets
# {"subnets": []}
_map_resource(RESOURCES[resource], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
RESOURCES[resource], dict()))
# 注册事件,监听数据库的变动
resource_registry.register_resource_by_name(resource)

for resource in SUB_RESOURCES:
_map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
SUB_RESOURCES[resource]['collection_name'],
dict()),
SUB_RESOURCES[resource]['parent'])

policy.reset()
# 初始化路由请求
super(APIRouter, self).__init__(mapper)

接下来我们需要看一下run_wsgi_app(app)的操作,主要是根据配置创建n个API Worker的进程,然后进行监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# neutron/service.py
def run_wsgi_app(app):
# 构造一个server如下,主要创建一个neutron协程
server = wsgi.Server("Neutron")
# 进行端口监听,响应API的请求
# 因为可以创建多个api worker,目前没细看怎么调度
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=_get_api_workers())
return server

# neutron/wsgi.py
class Server(object):
def __init__(self, name, num_threads=None, disable_ssl=False):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.num_threads = num_threads or CONF.wsgi_default_pool_size
self.disable_ssl = disable_ssl
# 创建协程
self.pool = eventlet.GreenPool(1)
self.name = name
self._server = None
self.client_socket_timeout = CONF.client_socket_timeout or None
if CONF.use_ssl and not self.disable_ssl:
sslutils.is_enabled(CONF)

def start(self, application, port, host='0.0.0.0', workers=0):
self._host = host
self._port = port
backlog = CONF.backlog

# 创建监听,host:port(配置文件bind_host和bind_port)
# 以及一些特性的配置
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._launch(application, workers)

def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
# 获取网络信息
info = socket.getaddrinfo(bind_addr[0],
bind_addr[1],
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]

sock = None
# 创建监听
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
# 允许重用
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 支持keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

# 配置idle time
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)

return sock

def _launch(self, application, workers=0):
service = WorkerService(self, application, self.disable_ssl, workers)
api.context_manager.dispose_pool()
# 创建管道
self._server = common_service.ProcessLauncher(cfg.CONF,
wait_interval=1.0)
# 启动n个api workers进程
self._server.launch_service(service,
workers=service.worker_process_count)

Core API & Extension API


未完待续。。。其实是因为没时间看这块了。。。

注意:所有文章非特别说明皆为原创。为保证信息与源同步,转载时请务必注明文章出处!谢谢合作 :-)

原始链接:http://zhaozhanxu.com/2017/06/11/OPENSTACK/2017-06-11-neutron2/

许可协议: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。