l2_multi.py
阅读原文时间:2023年07月11日阅读:1

# Copyright 2012-2013 James McCauley

Licensed under the Apache License, Version 2.0 (the "License");

you may not use this file except in compliance with the License.

You may obtain a copy of the License at:

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

"""
A shortest-path forwarding application.

This is a standalone L2 switch that learns ethernet addresses
across the entire network and picks short paths between them.

You shouldn't really write an application this way -- you should
keep more state in the controller (that is, your flow tables),
and/or you should make your topology more static. However, this
does (mostly) work. :)

Depends on openflow.discovery
Works with openflow.spanning_tree
"""

from pox.core import core
import pox.openflow.libopenflow_01 as of
from pox.lib.revent import *
from pox.lib.recoco import Timer
from collections import defaultdict
from pox.openflow.discovery import Discovery
from pox.lib.util import dpid_to_str
import time

log = core.getLogger()

Adjacency map. [sw1][sw2] -> port from sw1 to sw2

adjacency = defaultdict(lambda:defaultdict(lambda:None))

Switches we know of. [dpid] -> Switch

switches = {}

ethaddr -> (switch, port)

mac_map = {}

[sw1][sw2] -> (distance, intermediate)

path_map = defaultdict(lambda:defaultdict(lambda:(None,None)))

Waiting path. (dpid,xid)->WaitingPath

waiting_paths = {}

Time to not flood in seconds

FLOOD_HOLDDOWN = 5

Flow timeouts

FLOW_IDLE_TIMEOUT = 10
FLOW_HARD_TIMEOUT = 30

How long is allowable to set up a path?

PATH_SETUP_TIME = 4

def _calc_paths ():
"""
Essentially Floyd-Warshall algorithm
"""

def dump ():
for i in sws:
for j in sws:
a = path_map[i][j][0]
#a = adjacency[i][j]
if a is None: a = "*"
print a,
print

sws = switches.values()
path_map.clear()
for k in sws:
for j,port in adjacency[k].iteritems():
if port is None: continue
path_map[k][j] = (1,None)
path_map[k][k] = (0,None) # distance, intermediate

#dump()

for k in sws:
for i in sws:
for j in sws:
if path_map[i][k][0] is not None:
if path_map[k][j][0] is not None:
# i -> k -> j exists
ikj_dist = path_map[i][k][0]+path_map[k][j][0]
if path_map[i][j][0] is None or ikj_dist < path_map[i][j][0]: # i -> k -> j is better than existing
path_map[i][j] = (ikj_dist, k)

#print "--------------------"
#dump()

def _get_raw_path (src, dst):
"""
Get a raw path (just a list of nodes to traverse)
"""
if len(path_map) == 0: _calc_paths()
if src is dst:
# We're here!
return []
if path_map[src][dst][0] is None:
return None
intermediate = path_map[src][dst][1]
if intermediate is None:
# Directly connected
return []
return _get_raw_path(src, intermediate) + [intermediate] + \
_get_raw_path(intermediate, dst)

def _check_path (p):
"""
Make sure that a path is actually a string of nodes with connected ports

returns True if path is valid
"""
for a,b in zip(p[:-1],p[1:]):
if adjacency[a[0]][b[0]] != a[2]:
return False
if adjacency[b[0]][a[0]] != b[1]:
return False
return True

def _get_path (src, dst, first_port, final_port):
"""
Gets a cooked path -- a list of (node,in_port,out_port)
"""
# Start with a raw path…
if src == dst:
path = [src]
else:
path = _get_raw_path(src, dst)
if path is None: return None
path = [src] + path + [dst]

# Now add the ports
r = []
in_port = first_port
for s1,s2 in zip(path[:-1],path[1:]):
out_port = adjacency[s1][s2]
r.append((s1,in_port,out_port))
in_port = adjacency[s2][s1]
r.append((dst,in_port,final_port))

assert _check_path(r), "Illegal path!"

return r

class WaitingPath (object):
"""
A path which is waiting for its path to be established
"""
def __init__ (self, path, packet):
"""
xids is a sequence of (dpid,xid)
first_switch is the DPID where the packet came from
packet is something that can be sent in a packet_out
"""
self.expires_at = time.time() + PATH_SETUP_TIME
self.path = path
self.first_switch = path[0][0].dpid
self.xids = set()
self.packet = packet

if len(waiting\_paths) > 1000:  
  WaitingPath.expire\_waiting\_paths()

def add_xid (self, dpid, xid):
self.xids.add((dpid,xid))
waiting_paths[(dpid,xid)] = self

@property
def is_expired (self):
return time.time() >= self.expires_at

def notify (self, event):
"""
Called when a barrier has been received
"""
self.xids.discard((event.dpid,event.xid))
if len(self.xids) == 0:
# Done!
if self.packet:
log.debug("Sending delayed packet out %s"
% (dpid_to_str(self.first_switch),))
msg = of.ofp_packet_out(data=self.packet,
action=of.ofp_action_output(port=of.OFPP_TABLE))
core.openflow.sendToDPID(self.first_switch, msg)

  core.l2\_multi.raiseEvent(PathInstalled(self.path))

@staticmethod
def expire_waiting_paths ():
packets = set(waiting_paths.values())
killed = 0
for p in packets:
if p.is_expired:
killed += 1
for entry in p.xids:
waiting_paths.pop(entry, None)
if killed:
log.error("%i paths failed to install" % (killed,))

class PathInstalled (Event):
"""
Fired when a path is installed
"""
def __init__ (self, path):
Event.__init__(self)
self.path = path

class Switch (EventMixin):
def __init__ (self):
self.connection = None
self.ports = None
self.dpid = None
self._listeners = None
self._connected_at = None

def __repr__ (self):
return dpid_to_str(self.dpid)

def _install (self, switch, in_port, out_port, match, buf = None):
msg = of.ofp_flow_mod()
msg.match = match
msg.match.in_port = in_port
msg.idle_timeout = FLOW_IDLE_TIMEOUT
msg.hard_timeout = FLOW_HARD_TIMEOUT
msg.actions.append(of.ofp_action_output(port = out_port))
msg.buffer_id = buf
switch.connection.send(msg)

def _install_path (self, p, match, packet_in=None):
wp = WaitingPath(p, packet_in)
for sw,in_port,out_port in p:
self._install(sw, in_port, out_port, match)
msg = of.ofp_barrier_request()
sw.connection.send(msg)
wp.add_xid(sw.dpid,msg.xid)

def install_path (self, dst_sw, last_port, match, event):
"""
Attempts to install a path between this switch and some destination
"""
p = _get_path(self, dst_sw, event.port, last_port)
if p is None:
log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)

  import pox.lib.packet as pkt

  if (match.dl\_type == pkt.ethernet.IP\_TYPE and  
      event.parsed.find('ipv4')):  
    # It's IP -- let's send a destination unreachable  
    log.debug("Dest unreachable (%s -> %s)",  
              match.dl\_src, match.dl\_dst)

    from pox.lib.addresses import EthAddr  
    e = pkt.ethernet()  
    e.src = EthAddr(dpid\_to\_str(self.dpid)) #FIXME: Hmm...  
    e.dst = match.dl\_src  
    e.type = e.IP\_TYPE  
    ipp = pkt.ipv4()  
    ipp.protocol = ipp.ICMP\_PROTOCOL  
    ipp.srcip = match.nw\_dst #FIXME: Ridiculous  
    ipp.dstip = match.nw\_src  
    icmp = pkt.icmp()  
    icmp.type = pkt.ICMP.TYPE\_DEST\_UNREACH  
    icmp.code = pkt.ICMP.CODE\_UNREACH\_HOST  
    orig\_ip = event.parsed.find('ipv4')

    d = orig\_ip.pack()  
    d = d\[:orig\_ip.hl \* 4 + 8\]  
    import struct  
    d = struct.pack("!HH", 0,0) + d #FIXME: MTU  
    icmp.payload = d  
    ipp.payload = icmp  
    e.payload = ipp  
    msg = of.ofp\_packet\_out()  
    msg.actions.append(of.ofp\_action\_output(port = event.port))  
    msg.data = e.pack()  
    self.connection.send(msg)

  return

log.debug("Installing path for %s -> %s %04x (%i hops)",  
    match.dl\_src, match.dl\_dst, match.dl\_type, len(p))

# We have a path -- install it  
self.\_install\_path(p, match, event.ofp)

# Now reverse it and install it backwards  
# (we'll just assume that will work)  
p = \[(sw,out\_port,in\_port) for sw,in\_port,out\_port in p\]  
self.\_install\_path(p, match.flip())

def _handle_PacketIn (self, event):
def flood ():
""" Floods the packet """
if self.is_holding_down:
log.warning("Not flooding -- holddown active")
msg = of.ofp_packet_out()
# OFPP_FLOOD is optional; some switches may need OFPP_ALL
msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))
msg.buffer_id = event.ofp.buffer_id
msg.in_port = event.port
self.connection.send(msg)

def drop ():  
  # Kill the buffer  
  if event.ofp.buffer\_id is not None:  
    msg = of.ofp\_packet\_out()  
    msg.buffer\_id = event.ofp.buffer\_id  
    event.ofp.buffer\_id = None # Mark is dead  
    msg.in\_port = event.port  
    self.connection.send(msg)

packet = event.parsed

loc = (self, event.port) # Place we saw this ethaddr  
oldloc = mac\_map.get(packet.src) # Place we last saw this ethaddr

if packet.effective\_ethertype == packet.LLDP\_TYPE:  
  drop()  
  return

if oldloc is None:  
  if packet.src.is\_multicast == False:  
    mac\_map\[packet.src\] = loc # Learn position for ethaddr  
    log.debug("Learned %s at %s.%i", packet.src, loc\[0\], loc\[1\])  
elif oldloc != loc:  
  # ethaddr seen at different place!  
  if core.openflow\_discovery.is\_edge\_port(loc\[0\].dpid, loc\[1\]):  
    # New place is another "plain" port (probably)  
    log.debug("%s moved from %s.%i to %s.%i?", packet.src,  
              dpid\_to\_str(oldloc\[0\].dpid), oldloc\[1\],  
              dpid\_to\_str(   loc\[0\].dpid),    loc\[1\])  
    if packet.src.is\_multicast == False:  
      mac\_map\[packet.src\] = loc # Learn position for ethaddr  
      log.debug("Learned %s at %s.%i", packet.src, loc\[0\], loc\[1\])  
  elif packet.dst.is\_multicast == False:  
    # New place is a switch-to-switch port!  
    # Hopefully, this is a packet we're flooding because we didn't  
    # know the destination, and not because it's somehow not on a  
    # path that we expect it to be on.  
    # If spanning\_tree is running, we might check that this port is  
    # on the spanning tree (it should be).  
    if packet.dst in mac\_map:  
      # Unfortunately, we know the destination.  It's possible that  
      # we learned it while it was in flight, but it's also possible  
      # that something has gone wrong.  
      log.warning("Packet from %s to known destination %s arrived "  
                  "at %s.%i without flow", packet.src, packet.dst,  
                  dpid\_to\_str(self.dpid), event.port)

if packet.dst.is\_multicast:  
  log.debug("Flood multicast from %s", packet.src)  
  flood()  
else:  
  if packet.dst not in mac\_map:  
    log.debug("%s unknown -- flooding" % (packet.dst,))  
    flood()  
  else:  
    dest = mac\_map\[packet.dst\]  
    match = of.ofp\_match.from\_packet(packet)  
    self.install\_path(dest\[0\], dest\[1\], match, event)

def disconnect (self):
if self.connection is not None:
log.debug("Disconnect %s" % (self.connection,))
self.connection.removeListeners(self._listeners)
self.connection = None
self._listeners = None

def connect (self, connection):
if self.dpid is None:
self.dpid = connection.dpid
assert self.dpid == connection.dpid
if self.ports is None:
self.ports = connection.features.ports
self.disconnect()
log.debug("Connect %s" % (connection,))
self.connection = connection
self._listeners = self.listenTo(connection)
self._connected_at = time.time()

@property
def is_holding_down (self):
if self._connected_at is None: return True
if time.time() - self._connected_at > FLOOD_HOLDDOWN:
return False
return True

def _handle_ConnectionDown (self, event):
self.disconnect()

class l2_multi (EventMixin):

_eventMixin_events = set([
PathInstalled,
])

def __init__ (self):
# Listen to dependencies
def startup ():
core.openflow.addListeners(self, priority=0)
core.openflow_discovery.addListeners(self)
core.call_when_ready(startup, ('openflow','openflow_discovery'))

def _handle_LinkEvent (self, event):
def flip (link):
return Discovery.Link(link[2],link[3], link[0],link[1])

l = event.link  
sw1 = switches\[l.dpid1\]  
sw2 = switches\[l.dpid2\]

# Invalidate all flows and path info.  
# For link adds, this makes sure that if a new link leads to an  
# improved path, we use it.  
# For link removals, this makes sure that we don't use a  
# path that may have been broken.  
#NOTE: This could be radically improved! (e.g., not \*ALL\* paths break)  
clear = of.ofp\_flow\_mod(command=of.OFPFC\_DELETE)  
for sw in switches.itervalues():  
  if sw.connection is None: continue  
  sw.connection.send(clear)  
path\_map.clear()

if event.removed:  
  # This link no longer okay  
  if sw2 in adjacency\[sw1\]: del adjacency\[sw1\]\[sw2\]  
  if sw1 in adjacency\[sw2\]: del adjacency\[sw2\]\[sw1\]

  # But maybe there's another way to connect these...  
  for ll in core.openflow\_discovery.adjacency:  
    if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:  
      if flip(ll) in core.openflow\_discovery.adjacency:  
        # Yup, link goes both ways  
        adjacency\[sw1\]\[sw2\] = ll.port1  
        adjacency\[sw2\]\[sw1\] = ll.port2  
        # Fixed -- new link chosen to connect these  
        break  
else:  
  # If we already consider these nodes connected, we can  
  # ignore this link up.  
  # Otherwise, we might be interested...  
  if adjacency\[sw1\]\[sw2\] is None:  
    # These previously weren't connected.  If the link  
    # exists in both directions, we consider them connected now.  
    if flip(l) in core.openflow\_discovery.adjacency:  
      # Yup, link goes both ways -- connected!  
      adjacency\[sw1\]\[sw2\] = l.port1  
      adjacency\[sw2\]\[sw1\] = l.port2

  # If we have learned a MAC on this port which we now know to  
  # be connected to a switch, unlearn it.  
  bad\_macs = set()  
  for mac,(sw,port) in mac\_map.iteritems():  
    if sw is sw1 and port == l.port1: bad\_macs.add(mac)  
    if sw is sw2 and port == l.port2: bad\_macs.add(mac)  
  for mac in bad\_macs:  
    log.debug("Unlearned %s", mac)  
    del mac\_map\[mac\]

def _handle_ConnectionUp (self, event):
sw = switches.get(event.dpid)
if sw is None:
# New switch
sw = Switch()
switches[event.dpid] = sw
sw.connect(event.connection)
else:
sw.connect(event.connection)

def _handle_BarrierIn (self, event):
wp = waiting_paths.pop((event.dpid,event.xid), None)
if not wp:
#log.info("No waiting packet %s,%s", event.dpid, event.xid)
return
#log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)
wp.notify(event)

def launch ():
core.registerNew(l2_multi)

timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)
Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)