1.nova/api/openstack/compute/servers.py create()
在create函数前面包含三种装饰器,分别为:@wsgi.response、@wsgi.expected_errors、@validation.schema
(1)@wsgi.response属于限制装饰器,限制请求完成后的成功状态码202
(2)@wsgi.expected_errors属于限制装饰器,限制请求完成后的失败状态码400、403、409
(3)@validation.schema属于验证装饰器,在请求create函数开始前拦截http请求,对http请求中的参数进行校验,若校验成功则进入create,若失败则返回失败状态码,此装饰器主要功能是进行版本兼容
create函数中主要分为两个功能
(1)收集并校验参数,为后续创建虚拟机做准备
(2)调用nova/compute/api.py中的create()函数,此函数的主要功能是提供实例并将实例信息发送给调度器
@wsgi.response(202)
@wsgi.expected\_errors((400, 403, 409))
@validation.schema(schema\_servers.base\_create\_v20, '2.0', '2.0')
@validation.schema(schema\_servers.base\_create, '2.1', '2.18')
@validation.schema(schema\_servers.base\_create\_v219, '2.19', '2.31')
@validation.schema(schema\_servers.base\_create\_v232, '2.32', '2.32')
@validation.schema(schema\_servers.base\_create\_v233, '2.33', '2.36')
@validation.schema(schema\_servers.base\_create\_v237, '2.37', '2.41')
@validation.schema(schema\_servers.base\_create\_v242, '2.42', '2.51')
@validation.schema(schema\_servers.base\_create\_v252, '2.52', '2.56')
@validation.schema(schema\_servers.base\_create\_v257, '2.57', '2.62')
@validation.schema(schema\_servers.base\_create\_v263, '2.63', '2.66')
@validation.schema(schema\_servers.base\_create\_v267, '2.67', '2.73')
@validation.schema(schema\_servers.base\_create\_v274, '2.74')
def create(self, req, body):
"""Creates a new server for a given user."""
context = req.environ\['nova.context'\]
server\_dict = body\['server'\]
password = self.\_get\_server\_admin\_password(server\_dict)
name = common.normalize\_name(server\_dict\['name'\])
description = name
if api\_version\_request.is\_supported(req, min\_version='2.19'):
description = server\_dict.get('description')
# Arguments to be passed to instance create function
create\_kwargs = {}
create\_kwargs\['user\_data'\] = server\_dict.get('user\_data')
# NOTE(alex\_xu): The v2.1 API compat mode, we strip the spaces for
# keypair create. But we didn't strip spaces at here for
# backward-compatible some users already created keypair and name with
# leading/trailing spaces by legacy v2 API.
create\_kwargs\['key\_name'\] = server\_dict.get('key\_name')
create\_kwargs\['config\_drive'\] = server\_dict.get('config\_drive')
security\_groups = server\_dict.get('security\_groups')
if security\_groups is not None:
create\_kwargs\['security\_groups'\] = \[
sg\['name'\] for sg in security\_groups if sg.get('name')\]
create\_kwargs\['security\_groups'\] = list(
set(create\_kwargs\['security\_groups'\]))
scheduler\_hints = {}
if 'os:scheduler\_hints' in body:
scheduler\_hints = body\['os:scheduler\_hints'\]
elif 'OS-SCH-HNT:scheduler\_hints' in body:
scheduler\_hints = body\['OS-SCH-HNT:scheduler\_hints'\]
create\_kwargs\['scheduler\_hints'\] = scheduler\_hints
# min\_count and max\_count are optional. If they exist, they may come
# in as strings. Verify that they are valid integers and > 0.
# Also, we want to default 'min\_count' to 1, and default
# 'max\_count' to be 'min\_count'.
min\_count = int(server\_dict.get('min\_count', 1))
max\_count = int(server\_dict.get('max\_count', min\_count))
if min\_count > max\_count:
msg = \_('min\_count must be <= max\_count')
raise exc.HTTPBadRequest(explanation=msg)
create\_kwargs\['min\_count'\] = min\_count
create\_kwargs\['max\_count'\] = max\_count
availability\_zone = server\_dict.pop("availability\_zone", None)
if api\_version\_request.is\_supported(req, min\_version='2.52'):
create\_kwargs\['tags'\] = server\_dict.get('tags')
helpers.translate\_attributes(helpers.CREATE,
server\_dict, create\_kwargs)
target = {
'project\_id': context.project\_id,
'user\_id': context.user\_id,
'availability\_zone': availability\_zone}
context.can(server\_policies.SERVERS % 'create', target)
# Skip policy check for 'create:trusted\_certs' if no trusted
# certificate IDs were provided.
trusted\_certs = server\_dict.get('trusted\_image\_certificates', None)
if trusted\_certs:
create\_kwargs\['trusted\_certs'\] = trusted\_certs
context.can(server\_policies.SERVERS % 'create:trusted\_certs',
target=target)
parse\_az = self.compute\_api.parse\_availability\_zone
try:
availability\_zone, host, node = parse\_az(context,
availability\_zone)
except exception.InvalidInput as err:
raise exc.HTTPBadRequest(explanation=six.text\_type(err))
if host or node:
context.can(server\_policies.SERVERS % 'create:forced\_host', {})
if api\_version\_request.is\_supported(req, min\_version='2.74'):
self.\_process\_hosts\_for\_create(context, target, server\_dict,
create\_kwargs, host, node)
self.\_process\_bdms\_for\_create(
context, target, server\_dict, create\_kwargs)
image\_uuid = self.\_image\_from\_req\_data(server\_dict, create\_kwargs)
self.\_process\_networks\_for\_create(
context, target, server\_dict, create\_kwargs)
flavor\_id = self.\_flavor\_id\_from\_req\_data(body)
try:
inst\_type = flavors.get\_flavor\_by\_flavor\_id(
flavor\_id, ctxt=context, read\_deleted="no")
supports\_multiattach = common.supports\_multiattach\_volume(req)
supports\_port\_resource\_request = \\
common.supports\_port\_resource\_request(req)
(instances, resv\_id) = self.compute\_api.create(context,
inst\_type,
image\_uuid,
display\_name=name,
display\_description=description,
availability\_zone=availability\_zone,
forced\_host=host, forced\_node=node,
metadata=server\_dict.get('metadata', {}),
admin\_password=password,
check\_server\_group\_quota=True,
supports\_multiattach=supports\_multiattach,
supports\_port\_resource\_request=supports\_port\_resource\_request,
\*\*create\_kwargs)
except (exception.QuotaError,
exception.PortLimitExceeded) as error:
raise exc.HTTPForbidden(
explanation=error.format\_message())
except exception.ImageNotFound:
msg = \_("Can not find requested image")
raise exc.HTTPBadRequest(explanation=msg)
except exception.KeypairNotFound:
msg = \_("Invalid key\_name provided.")
raise exc.HTTPBadRequest(explanation=msg)
except exception.ConfigDriveInvalidValue:
msg = \_("Invalid config\_drive provided.")
raise exc.HTTPBadRequest(explanation=msg)
except (exception.BootFromVolumeRequiredForZeroDiskFlavor,
exception.ExternalNetworkAttachForbidden) as error:
raise exc.HTTPForbidden(explanation=error.format\_message())
except messaging.RemoteError as err:
msg = "%(err\_type)s: %(err\_msg)s" % {'err\_type': err.exc\_type,
'err\_msg': err.value}
raise exc.HTTPBadRequest(explanation=msg)
except UnicodeDecodeError as error:
msg = "UnicodeError: %s" % error
raise exc.HTTPBadRequest(explanation=msg)
except (exception.ImageNotActive,
exception.ImageBadRequest,
exception.ImageNotAuthorized,
exception.FixedIpNotFoundForAddress,
exception.FlavorNotFound,
exception.FlavorDiskTooSmall,
exception.FlavorMemoryTooSmall,
exception.InvalidMetadata,
exception.InvalidVolume,
exception.MultiplePortsNotApplicable,
exception.InvalidFixedIpAndMaxCountRequest,
exception.InstanceUserDataMalformed,
exception.PortNotFound,
exception.FixedIpAlreadyInUse,
exception.SecurityGroupNotFound,
exception.PortRequiresFixedIP,
exception.NetworkRequiresSubnet,
exception.NetworkNotFound,
exception.InvalidBDM,
exception.InvalidBDMSnapshot,
exception.InvalidBDMVolume,
exception.InvalidBDMImage,
exception.InvalidBDMBootSequence,
exception.InvalidBDMLocalsLimit,
exception.InvalidBDMVolumeNotBootable,
exception.InvalidBDMEphemeralSize,
exception.InvalidBDMFormat,
exception.InvalidBDMSwapSize,
exception.VolumeTypeNotFound,
exception.AutoDiskConfigDisabledByImage,
exception.InstanceGroupNotFound,
exception.SnapshotNotFound,
exception.UnableToAutoAllocateNetwork,
exception.MultiattachNotSupportedOldMicroversion,
exception.CertificateValidationFailed,
exception.CreateWithPortResourceRequestOldVersion,
exception.ComputeHostNotFound) as error:
raise exc.HTTPBadRequest(explanation=error.format\_message())
except INVALID\_FLAVOR\_IMAGE\_EXCEPTIONS as error:
raise exc.HTTPBadRequest(explanation=error.format\_message())
except (exception.PortInUse,
exception.InstanceExists,
exception.NetworkAmbiguous,
exception.NoUniqueMatch,
exception.VolumeTypeSupportNotYetAvailable) as error:
raise exc.HTTPConflict(explanation=error.format\_message())
# If the caller wanted a reservation\_id, return it
if server\_dict.get('return\_reservation\_id', False):
return wsgi.ResponseObject({'reservation\_id': resv\_id})
server = self.\_view\_builder.create(req, instances\[0\])
if CONF.api.enable\_instance\_password:
server\['server'\]\['adminPass'\] = password
robj = wsgi.ResponseObject(server)
return self.\_add\_location(robj)
第17行~第111行的功能为收集数据、校验数据将数据打包
第80行
context.can(server_policies.SERVERS % 'create', target)
最终调用nova/policy.py中authorize()
def authorize(context, action, target=None, do_raise=True, exc=None):
"""Verifies that the action is valid on the target in this context.
:param context: nova context
:param action: string representing the action to be checked
this should be colon separated for clarity.
i.e. \`\`compute:create\_instance\`\`,
\`\`compute:attach\_volume\`\`,
\`\`volume:attach\_volume\`\`
:param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the
location of the object e.g. \`\`{'project\_id': instance.project\_id}\`\`
If None, then this default target will be considered:
{'project\_id': self.project\_id, 'user\_id': self.user\_id}
:param do\_raise: if True (the default), raises PolicyNotAuthorized;
if False, returns False
:param exc: Class of the exception to raise if the check fails.
Any remaining arguments passed to :meth:\`authorize\` (both
positional and keyword arguments) will be passed to
the exception class. If not specified,
:class:\`PolicyNotAuthorized\` will be used.
:raises nova.exception.PolicyNotAuthorized: if verification fails
and do\_raise is True. Or if 'exc' is specified it will raise an
exception of that type.
:return: returns a non-False value (not necessarily "True") if
authorized, and the exact value False if not authorized and
do\_raise is False.
"""
init()
credentials = context.to\_policy\_values()
if not exc:
exc = exception.PolicyNotAuthorized
# Legacy fallback for emtpy target from context.can()
# should be removed once we improve testing and scope checks
if target is None:
target = default\_target(context)
try:
result = \_ENFORCER.authorize(action, target, credentials,
do\_raise=do\_raise, exc=exc, action=action)
except policy.PolicyNotRegistered:
with excutils.save\_and\_reraise\_exception():
LOG.exception(\_LE('Policy not registered'))
except Exception:
with excutils.save\_and\_reraise\_exception():
LOG.debug('Policy check for %(action)s failed with credentials '
'%(credentials)s',
{'action': action, 'credentials': credentials})
return result
第30行init()为一个初始化函数,主要功能是加载路由和配置文件
def init(policy_file=None, rules=None, default_rule=None, use_conf=True):
"""Init an Enforcer class.
:param policy\_file: Custom policy file to use, if none is specified,
\`CONF.policy\_file\` will be used.
:param rules: Default dictionary / Rules to use. It will be
considered just in the first instantiation.
:param default\_rule: Default rule to use, CONF.default\_rule will
be used if none is specified.
:param use\_conf: Whether to load rules from config file.
"""
global \_ENFORCER
global saved\_file\_rules
if not \_ENFORCER:
\_ENFORCER = policy.Enforcer(CONF,
policy\_file=policy\_file,
rules=rules,
default\_rule=default\_rule,
use\_conf=use\_conf)
register\_rules(\_ENFORCER)
\_ENFORCER.load\_rules()
# Only the rules which are loaded from file may be changed.
current\_file\_rules = \_ENFORCER.file\_rules
current\_file\_rules = \_serialize\_rules(current\_file\_rules)
# Checks whether the rules are updated in the runtime
if saved\_file\_rules != current\_file\_rules:
\_warning\_for\_deprecated\_user\_based\_rules(current\_file\_rules)
saved\_file\_rules = copy.deepcopy(current\_file\_rules)
第22行为注册路由功能(详情可参考源码)
跳转至nova/compute/api.py中create()方法
@hooks.add_hook("create_instance")
def create(self, context, instance_type,
image_href, kernel_id=None, ramdisk_id=None,
min_count=None, max_count=None,
display_name=None, display_description=None,
key_name=None, key_data=None, security_groups=None,
availability_zone=None, forced_host=None, forced_node=None,
user_data=None, metadata=None, injected_files=None,
admin_password=None, block_device_mapping=None,
access_ip_v4=None, access_ip_v6=None, requested_networks=None,
config_drive=None, auto_disk_config=None, scheduler_hints=None,
legacy_bdm=True, shutdown_terminate=False,
check_server_group_quota=False, tags=None,
supports_multiattach=False, trusted_certs=None,
supports_port_resource_request=False,
requested_host=None, requested_hypervisor_hostname=None):
"""Provision instances, sending instance information to the
scheduler. The scheduler will determine where the instance(s)
go and will handle creating the DB entries.
Returns a tuple of (instances, reservation\_id)
"""
if requested\_networks and max\_count is not None and max\_count > 1:
self.\_check\_multiple\_instances\_with\_specified\_ip(
requested\_networks)
if utils.is\_neutron():
self.\_check\_multiple\_instances\_with\_neutron\_ports(
requested\_networks)
if availability\_zone:
available\_zones = availability\_zones. \\
get\_availability\_zones(context.elevated(), self.host\_api,
get\_only\_available=True)
if forced\_host is None and availability\_zone not in \\
available\_zones:
msg = \_('The requested availability zone is not available')
raise exception.InvalidRequest(msg)
filter\_properties = scheduler\_utils.build\_filter\_properties(
scheduler\_hints, forced\_host, forced\_node, instance\_type)
return self.\_create\_instance(
context, instance\_type,
image\_href, kernel\_id, ramdisk\_id,
min\_count, max\_count,
display\_name, display\_description,
key\_name, key\_data, security\_groups,
availability\_zone, user\_data, metadata,
injected\_files, admin\_password,
access\_ip\_v4, access\_ip\_v6,
requested\_networks, config\_drive,
block\_device\_mapping, auto\_disk\_config,
filter\_properties=filter\_properties,
legacy\_bdm=legacy\_bdm,
shutdown\_terminate=shutdown\_terminate,
check\_server\_group\_quota=check\_server\_group\_quota,
tags=tags, supports\_multiattach=supports\_multiattach,
trusted\_certs=trusted\_certs,
supports\_port\_resource\_request=supports\_port\_resource\_request,
requested\_host=requested\_host,
requested\_hypervisor\_hostname=requested\_hypervisor\_hostname)
整段代码主要对网络ip、端口进行校验,获取可用区列表检验可用区是否属于此列表,并根据条件筛选符合创建虚机所需的主机的条件,最后调用nova/compute/api.py中的_create_instance()方法
def \_create\_instance(self, context, instance\_type,
image\_href, kernel\_id, ramdisk\_id,
min\_count, max\_count,
display\_name, display\_description,
key\_name, key\_data, security\_groups,
availability\_zone, user\_data, metadata, injected\_files,
admin\_password, access\_ip\_v4, access\_ip\_v6,
requested\_networks, config\_drive,
block\_device\_mapping, auto\_disk\_config, filter\_properties,
reservation\_id=None, legacy\_bdm=True, shutdown\_terminate=False,
check\_server\_group\_quota=False, tags=None,
supports\_multiattach=False, trusted\_certs=None,
supports\_port\_resource\_request=False,
requested\_host=None, requested\_hypervisor\_hostname=None):
"""Verify all the input parameters regardless of the provisioning
strategy being performed and schedule the instance(s) for
creation.
"""
# Normalize and setup some parameters
if reservation\_id is None:
reservation\_id = utils.generate\_uid('r')
security\_groups = security\_groups or \['default'\]
min\_count = min\_count or 1
max\_count = max\_count or min\_count
block\_device\_mapping = block\_device\_mapping or \[\]
tags = tags or \[\]
if image\_href:
image\_id, boot\_meta = self.\_get\_image(context, image\_href)
else:
# This is similar to the logic in \_retrieve\_trusted\_certs\_object.
if (trusted\_certs or
(CONF.glance.verify\_glance\_signatures and
CONF.glance.enable\_certificate\_validation and
CONF.glance.default\_trusted\_certificate\_ids)):
msg = \_("Image certificate validation is not supported "
"when booting from volume")
raise exception.CertificateValidationFailed(message=msg)
image\_id = None
boot\_meta = self.\_get\_bdm\_image\_metadata(
context, block\_device\_mapping, legacy\_bdm)
self.\_check\_auto\_disk\_config(image=boot\_meta,
auto\_disk\_config=auto\_disk\_config)
base\_options, max\_net\_count, key\_pair, security\_groups, \\
network\_metadata = self.\_validate\_and\_build\_base\_options(
context, instance\_type, boot\_meta, image\_href, image\_id,
kernel\_id, ramdisk\_id, display\_name, display\_description,
key\_name, key\_data, security\_groups, availability\_zone,
user\_data, metadata, access\_ip\_v4, access\_ip\_v6,
requested\_networks, config\_drive, auto\_disk\_config,
reservation\_id, max\_count, supports\_port\_resource\_request)
# max\_net\_count is the maximum number of instances requested by the
# user adjusted for any network quota constraints, including
# consideration of connections to each requested network
if max\_net\_count < min\_count:
raise exception.PortLimitExceeded()
elif max\_net\_count < max\_count:
LOG.info("max count reduced from %(max\_count)d to "
"%(max\_net\_count)d due to network port quota",
{'max\_count': max\_count,
'max\_net\_count': max\_net\_count})
max\_count = max\_net\_count
block\_device\_mapping = self.\_check\_and\_transform\_bdm(context,
base\_options, instance\_type, boot\_meta, min\_count,
max\_count,
block\_device\_mapping, legacy\_bdm)
# We can't do this check earlier because we need bdms from all sources
# to have been merged in order to get the root bdm.
# Set validate\_numa=False since numa validation is already done by
# \_validate\_and\_build\_base\_options().
self.\_checks\_for\_create\_and\_rebuild(context, image\_id, boot\_meta,
instance\_type, metadata, injected\_files,
block\_device\_mapping.root\_bdm(), validate\_numa=False)
instance\_group = self.\_get\_requested\_instance\_group(context,
filter\_properties)
tags = self.\_create\_tag\_list\_obj(context, tags)
instances\_to\_build = self.\_provision\_instances(
context, instance\_type, min\_count, max\_count, base\_options,
boot\_meta, security\_groups, block\_device\_mapping,
shutdown\_terminate, instance\_group, check\_server\_group\_quota,
filter\_properties, key\_pair, tags, trusted\_certs,
supports\_multiattach, network\_metadata,
requested\_host, requested\_hypervisor\_hostname)
instances = \[\]
request\_specs = \[\]
build\_requests = \[\]
for rs, build\_request, im in instances\_to\_build:
build\_requests.append(build\_request)
instance = build\_request.get\_new\_instance(context)
instances.append(instance)
request\_specs.append(rs)
self.compute\_task\_api.schedule\_and\_build\_instances(
context,
build\_requests=build\_requests,
request\_spec=request\_specs,
image=boot\_meta,
admin\_password=admin\_password,
injected\_files=injected\_files,
requested\_networks=requested\_networks,
block\_device\_mapping=block\_device\_mapping,
tags=tags)
return instances, reservation\_id
整段代码的作用为收集创建虚机所需的disk,image信息,保证创建顺利创建,最后调用nova/conductor/api.py中schedule_and_build_instances()方法
def schedule\_and\_build\_instances(self, context, build\_requests,
request\_spec, image,
admin\_password, injected\_files,
requested\_networks, block\_device\_mapping,
tags=None):
self.conductor\_compute\_rpcapi.schedule\_and\_build\_instances(
context, build\_requests, request\_spec, image,
admin\_password, injected\_files, requested\_networks,
block\_device\_mapping, tags)
整段代码只起到缓冲作用,立刻调用nova/conductor/rpcapi.py中schedule_and_build_instances()方法
def schedule\_and\_build\_instances(self, context, build\_requests,
request\_specs,
image, admin\_password, injected\_files,
requested\_networks,
block\_device\_mapping,
tags=None):
version = '1.17'
kw = {'build\_requests': build\_requests,
'request\_specs': request\_specs,
'image': jsonutils.to\_primitive(image),
'admin\_password': admin\_password,
'injected\_files': injected\_files,
'requested\_networks': requested\_networks,
'block\_device\_mapping': block\_device\_mapping,
'tags': tags}
if not self.client.can\_send\_version(version):
version = '1.16'
del kw\['tags'\]
cctxt = self.client.prepare(version=version)
cctxt.cast(context, 'schedule\_and\_build\_instances', \*\*kw)
整段代码有两个作用,一是进行版本判断,根据版本调整参数,二是使用rpc调用nova/conductor/manager.py中schedule_and_build_instances()方法
def schedule\_and\_build\_instances(self, context, build\_requests,
request\_specs, image,
admin\_password, injected\_files,
requested\_networks, block\_device\_mapping,
tags=None):
# Add all the UUIDs for the instances
instance\_uuids = \[spec.instance\_uuid for spec in request\_specs\]
try:
host\_lists = self.\_schedule\_instances(context, request\_specs\[0\],
instance\_uuids, return\_alternates=True)
except Exception as exc:
LOG.exception('Failed to schedule instances')
self.\_bury\_in\_cell0(context, request\_specs\[0\], exc,
build\_requests=build\_requests,
block\_device\_mapping=block\_device\_mapping,
tags=tags)
return
host\_mapping\_cache = {}
cell\_mapping\_cache = {}
instances = \[\]
host\_az = {} # host=az cache to optimize multi-create
for (build\_request, request\_spec, host\_list) in six.moves.zip(
build\_requests, request\_specs, host\_lists):
instance = build\_request.get\_new\_instance(context)
# host\_list is a list of one or more Selection objects, the first
# of which has been selected and its resources claimed.
host = host\_list\[0\]
# Convert host from the scheduler into a cell record
if host.service\_host not in host\_mapping\_cache:
try:
host\_mapping = objects.HostMapping.get\_by\_host(
context, host.service\_host)
host\_mapping\_cache\[host.service\_host\] = host\_mapping
except exception.HostMappingNotFound as exc:
LOG.error('No host-to-cell mapping found for selected '
'host %(host)s. Setup is incomplete.',
{'host': host.service\_host})
self.\_bury\_in\_cell0(
context, request\_spec, exc,
build\_requests=\[build\_request\], instances=\[instance\],
block\_device\_mapping=block\_device\_mapping,
tags=tags)
# This is a placeholder in case the quota recheck fails.
instances.append(None)
continue
else:
host\_mapping = host\_mapping\_cache\[host.service\_host\]
cell = host\_mapping.cell\_mapping
# Before we create the instance, let's make one final check that
# the build request is still around and wasn't deleted by the user
# already.
try:
objects.BuildRequest.get\_by\_instance\_uuid(
context, instance.uuid)
except exception.BuildRequestNotFound:
# the build request is gone so we're done for this instance
LOG.debug('While scheduling instance, the build request '
'was already deleted.', instance=instance)
# This is a placeholder in case the quota recheck fails.
instances.append(None)
# If the build request was deleted and the instance is not
# going to be created, there is on point in leaving an orphan
# instance mapping so delete it.
try:
im = objects.InstanceMapping.get\_by\_instance\_uuid(
context, instance.uuid)
im.destroy()
except exception.InstanceMappingNotFound:
pass
self.report\_client.delete\_allocation\_for\_instance(
context, instance.uuid)
continue
else:
if host.service\_host not in host\_az:
host\_az\[host.service\_host\] = (
availability\_zones.get\_host\_availability\_zone(
context, host.service\_host))
instance.availability\_zone = host\_az\[host.service\_host\]
with obj\_target\_cell(instance, cell):
instance.create()
instances.append(instance)
cell\_mapping\_cache\[instance.uuid\] = cell
# NOTE(melwitt): We recheck the quota after creating the
# objects to prevent users from allocating more resources
# than their allowed quota in the event of a race. This is
# configurable because it can be expensive if strict quota
# limits are not required in a deployment.
if CONF.quota.recheck\_quota:
try:
compute\_utils.check\_num\_instances\_quota(
context, instance.flavor, 0, 0,
orig\_num\_req=len(build\_requests))
except exception.TooManyInstances as exc:
with excutils.save\_and\_reraise\_exception():
self.\_cleanup\_build\_artifacts(context, exc, instances,
build\_requests,
request\_specs,
block\_device\_mapping, tags,
cell\_mapping\_cache)
zipped = six.moves.zip(build\_requests, request\_specs, host\_lists,
instances)
for (build\_request, request\_spec, host\_list, instance) in zipped:
if instance is None:
# Skip placeholders that were buried in cell0 or had their
# build requests deleted by the user before instance create.
continue
cell = cell\_mapping\_cache\[instance.uuid\]
# host\_list is a list of one or more Selection objects, the first
# of which has been selected and its resources claimed.
host = host\_list.pop(0)
alts = \[(alt.service\_host, alt.nodename) for alt in host\_list\]
LOG.debug("Selected host: %s; Selected node: %s; Alternates: %s",
host.service\_host, host.nodename, alts, instance=instance)
filter\_props = request\_spec.to\_legacy\_filter\_properties\_dict()
scheduler\_utils.populate\_retry(filter\_props, instance.uuid)
scheduler\_utils.populate\_filter\_properties(filter\_props,
host)
# Now that we have a selected host (which has claimed resource
# allocations in the scheduler) for this instance, we may need to
# map allocations to resource providers in the request spec.
try:
scheduler\_utils.fill\_provider\_mapping(
context, self.report\_client, request\_spec, host)
except Exception as exc:
# If anything failed here we need to cleanup and bail out.
with excutils.save\_and\_reraise\_exception():
self.\_cleanup\_build\_artifacts(
context, exc, instances, build\_requests, request\_specs,
block\_device\_mapping, tags, cell\_mapping\_cache)
# TODO(melwitt): Maybe we should set\_target\_cell on the contexts
# once we map to a cell, and remove these separate with statements.
with obj\_target\_cell(instance, cell) as cctxt:
# send a state update notification for the initial create to
# show it going from non-existent to BUILDING
# This can lazy-load attributes on instance.
notifications.send\_update\_with\_states(cctxt, instance, None,
vm\_states.BUILDING, None, None, service="conductor")
objects.InstanceAction.action\_start(
cctxt, instance.uuid, instance\_actions.CREATE,
want\_result=False)
instance\_bdms = self.\_create\_block\_device\_mapping(
cell, instance.flavor, instance.uuid, block\_device\_mapping)
instance\_tags = self.\_create\_tags(cctxt, instance.uuid, tags)
# TODO(Kevin Zheng): clean this up once instance.create() handles
# tags; we do this so the instance.create notification in
# build\_and\_run\_instance in nova-compute doesn't lazy-load tags
instance.tags = instance\_tags if instance\_tags \\
else objects.TagList()
# Update mapping for instance. Normally this check is guarded by
# a try/except but if we're here we know that a newer nova-api
# handled the build process and would have created the mapping
inst\_mapping = objects.InstanceMapping.get\_by\_instance\_uuid(
context, instance.uuid)
inst\_mapping.cell\_mapping = cell
inst\_mapping.save()
if not self.\_delete\_build\_request(
context, build\_request, instance, cell, instance\_bdms,
instance\_tags):
# The build request was deleted before/during scheduling so
# the instance is gone and we don't have anything to build for
# this one.
continue
# NOTE(danms): Compute RPC expects security group names or ids
# not objects, so convert this to a list of names until we can
# pass the objects.
legacy\_secgroups = \[s.identifier
for s in request\_spec.security\_groups\]
with obj\_target\_cell(instance, cell) as cctxt:
self.compute\_rpcapi.build\_and\_run\_instance(
cctxt, instance=instance, image=image,
request\_spec=request\_spec,
filter\_properties=filter\_props,
admin\_password=admin\_password,
injected\_files=injected\_files,
requested\_networks=requested\_networks,
security\_groups=legacy\_secgroups,
block\_device\_mapping=instance\_bdms,
host=host.service\_host, node=host.nodename,
limits=host.limits, host\_list=host\_list)
此方法比较复杂,第9行调用_schedule_instances()获取符合创建虚机的主机
def \_schedule\_instances(self, context, request\_spec,
instance\_uuids=None, return\_alternates=False):
scheduler\_utils.setup\_instance\_group(context, request\_spec)
with timeutils.StopWatch() as timer:
host\_lists = self.query\_client.select\_destinations(
context, request\_spec, instance\_uuids, return\_objects=True,
return\_alternates=return\_alternates)
LOG.debug('Took %0.2f seconds to select destinations for %s '
'instance(s).', timer.elapsed(), len(instance\_uuids))
return host\_lists
第5行调用select_destinations()方法,此方法调用shceduler_rpcapi中的select_destinations()方法进行rpc调用
def select\_destinations(self, context, spec\_obj, instance\_uuids,
return\_objects=False, return\_alternates=False):
"""Returns destinations(s) best suited for this request\_spec and
filter\_properties.
When return\_objects is False, the result will be the "old-style" list
of dicts with 'host', 'nodename' and 'limits' as keys. The value of
return\_alternates is ignored.
When return\_objects is True, the result will be a list of lists of
Selection objects, with one list per instance. Each instance's list
will contain a Selection representing the selected (and claimed) host,
and, if return\_alternates is True, zero or more Selection objects that
represent alternate hosts. The number of alternates returned depends on
the configuration setting \`CONF.scheduler.max\_attempts\`.
"""
return self.scheduler\_rpcapi.select\_destinations(context, spec\_obj,
instance\_uuids, return\_objects, return\_alternates)
def select\_destinations(self, ctxt, spec\_obj, instance\_uuids,
return\_objects=False, return\_alternates=False):
# Modify the parameters if an older version is requested
version = '4.5'
msg\_args = {'instance\_uuids': instance\_uuids,
'spec\_obj': spec\_obj,
'return\_objects': return\_objects,
'return\_alternates': return\_alternates}
if not self.client.can\_send\_version(version):
if msg\_args\['return\_objects'\] or msg\_args\['return\_alternates'\]:
# The client is requesting an RPC version we can't support.
raise exc.SelectionObjectsWithOldRPCVersionNotSupported(
version=self.client.version\_cap)
del msg\_args\['return\_objects'\]
del msg\_args\['return\_alternates'\]
version = '4.4'
if not self.client.can\_send\_version(version):
del msg\_args\['instance\_uuids'\]
version = '4.3'
if not self.client.can\_send\_version(version):
del msg\_args\['spec\_obj'\]
msg\_args\['request\_spec'\] = spec\_obj.to\_legacy\_request\_spec\_dict()
msg\_args\['filter\_properties'
\] = spec\_obj.to\_legacy\_filter\_properties\_dict()
version = '4.0'
cctxt = self.client.prepare(
version=version, call\_monitor\_timeout=CONF.rpc\_response\_timeout,
timeout=CONF.long\_rpc\_timeout)
return cctxt.call(ctxt, 'select\_destinations', \*\*msg\_args)
注:精力有限,后续方法可自行阅读源码,方法如上
回归正题
此方法多为操作数据库,进行数据查询并判断,最后遍历host_list进行虚机创建,此处由于是for循环,所以每创建一个虚机都会调用一次build_and_run_instance()方法
def build\_and\_run\_instance(self, ctxt, instance, host, image, request\_spec,
filter\_properties, admin\_password=None, injected\_files=None,
requested\_networks=None, security\_groups=None,
block\_device\_mapping=None, node=None, limits=None,
host\_list=None):
# NOTE(edleafe): compute nodes can only use the dict form of limits.
if isinstance(limits, objects.SchedulerLimits):
limits = limits.to\_dict()
kwargs = {"instance": instance,
"image": image,
"request\_spec": request\_spec,
"filter\_properties": filter\_properties,
"admin\_password": admin\_password,
"injected\_files": injected\_files,
"requested\_networks": requested\_networks,
"security\_groups": security\_groups,
"block\_device\_mapping": block\_device\_mapping,
"node": node,
"limits": limits,
"host\_list": host\_list,
}
client = self.router.client(ctxt)
version = '5.0'
cctxt = client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'build\_and\_run\_instance', \*\*kwargs)
此方法主要功能是用rpc调用nova/compute/manager.py中build_and_run_instances()方法
@wrap\_exception()
@reverts\_task\_state
@wrap\_instance\_fault
def build\_and\_run\_instance(self, context, instance, image, request\_spec,
filter\_properties, admin\_password=None,
injected\_files=None, requested\_networks=None,
security\_groups=None, block\_device\_mapping=None,
node=None, limits=None, host\_list=None):
@utils.synchronized(instance.uuid)
def \_locked\_do\_build\_and\_run\_instance(\*args, \*\*kwargs):
# NOTE(danms): We grab the semaphore with the instance uuid
# locked because we could wait in line to build this instance
# for a while and we want to make sure that nothing else tries
# to do anything with this instance while we wait.
with self.\_build\_semaphore:
try:
result = self.\_do\_build\_and\_run\_instance(\*args, \*\*kwargs)
except Exception:
# NOTE(mriedem): This should really only happen if
# \_decode\_files in \_do\_build\_and\_run\_instance fails, and
# that's before a guest is spawned so it's OK to remove
# allocations for the instance for this node from Placement
# below as there is no guest consuming resources anyway.
# The \_decode\_files case could be handled more specifically
# but that's left for another day.
result = build\_results.FAILED
raise
finally:
if result == build\_results.FAILED:
# Remove the allocation records from Placement for the
# instance if the build failed. The instance.host is
# likely set to None in \_do\_build\_and\_run\_instance
# which means if the user deletes the instance, it
# will be deleted in the API, not the compute service.
# Setting the instance.host to None in
# \_do\_build\_and\_run\_instance means that the
# ResourceTracker will no longer consider this instance
# to be claiming resources against it, so we want to
# reflect that same thing in Placement. No need to
# call this for a reschedule, as the allocations will
# have already been removed in
# self.\_do\_build\_and\_run\_instance().
self.reportclient.delete\_allocation\_for\_instance(
context, instance.uuid)
if result in (build\_results.FAILED,
build\_results.RESCHEDULED):
self.\_build\_failed(node)
else:
self.\_build\_succeeded(node)
# NOTE(danms): We spawn here to return the RPC worker thread back to
# the pool. Since what follows could take a really long time, we don't
# want to tie up RPC workers.
utils.spawn\_n(\_locked\_do\_build\_and\_run\_instance,
context, instance, image, request\_spec,
filter\_properties, admin\_password, injected\_files,
requested\_networks, security\_groups,
block\_device\_mapping, node, limits, host\_list)
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
\_context = common\_context.get\_current()
profiler\_info = \_serialize\_profile\_info()
@functools.wraps(func)
def context\_wrapper(\*args, \*\*kwargs):
# NOTE: If update\_store is not called after spawn\_n it won't be
# available for the logger to pull from threadlocal storage.
if \_context is not None:
\_context.update\_store()
if profiler\_info and profiler:
profiler.init(\*\*profiler\_info)
func(\*args, \*\*kwargs)
eventlet.spawn\_n(context\_wrapper, \*args, \*\*kwargs)
此方法使用协程进行创建虚机(为保证数据一致性,创建前需要加锁)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章