# Copyright 2012-2013 James McCauley
"""
A shortest-path forwarding application.
This is a standalone L2 switch that learns ethernet addresses
across the entire network and picks short paths between them.
You shouldn't really write an application this way -- you should
keep more state in the controller (that is, your flow tables),
and/or you should make your topology more static. However, this
does (mostly) work. :)
Depends on openflow.discovery
Works with openflow.spanning_tree
"""
from pox.core import core
import pox.openflow.libopenflow_01 as of
from pox.lib.revent import *
from pox.lib.recoco import Timer
from collections import defaultdict
from pox.openflow.discovery import Discovery
from pox.lib.util import dpid_to_str
import time
log = core.getLogger()
adjacency = defaultdict(lambda:defaultdict(lambda:None))
switches = {}
mac_map = {}
path_map = defaultdict(lambda:defaultdict(lambda:(None,None)))
waiting_paths = {}
FLOOD_HOLDDOWN = 5
FLOW_IDLE_TIMEOUT = 10
FLOW_HARD_TIMEOUT = 30
PATH_SETUP_TIME = 4
def _calc_paths ():
"""
Essentially Floyd-Warshall algorithm
"""
def dump ():
for i in sws:
for j in sws:
a = path_map[i][j][0]
#a = adjacency[i][j]
if a is None: a = "*"
print a,
print
sws = switches.values()
path_map.clear()
for k in sws:
for j,port in adjacency[k].iteritems():
if port is None: continue
path_map[k][j] = (1,None)
path_map[k][k] = (0,None) # distance, intermediate
#dump()
for k in sws:
for i in sws:
for j in sws:
if path_map[i][k][0] is not None:
if path_map[k][j][0] is not None:
# i -> k -> j exists
ikj_dist = path_map[i][k][0]+path_map[k][j][0]
if path_map[i][j][0] is None or ikj_dist < path_map[i][j][0]:
# i -> k -> j is better than existing
path_map[i][j] = (ikj_dist, k)
#print "--------------------"
#dump()
def _get_raw_path (src, dst):
"""
Get a raw path (just a list of nodes to traverse)
"""
if len(path_map) == 0: _calc_paths()
if src is dst:
# We're here!
return []
if path_map[src][dst][0] is None:
return None
intermediate = path_map[src][dst][1]
if intermediate is None:
# Directly connected
return []
return _get_raw_path(src, intermediate) + [intermediate] + \
_get_raw_path(intermediate, dst)
def _check_path (p):
"""
Make sure that a path is actually a string of nodes with connected ports
returns True if path is valid
"""
for a,b in zip(p[:-1],p[1:]):
if adjacency[a[0]][b[0]] != a[2]:
return False
if adjacency[b[0]][a[0]] != b[1]:
return False
return True
def _get_path (src, dst, first_port, final_port):
"""
Gets a cooked path -- a list of (node,in_port,out_port)
"""
# Start with a raw path…
if src == dst:
path = [src]
else:
path = _get_raw_path(src, dst)
if path is None: return None
path = [src] + path + [dst]
# Now add the ports
r = []
in_port = first_port
for s1,s2 in zip(path[:-1],path[1:]):
out_port = adjacency[s1][s2]
r.append((s1,in_port,out_port))
in_port = adjacency[s2][s1]
r.append((dst,in_port,final_port))
assert _check_path(r), "Illegal path!"
return r
class WaitingPath (object):
"""
A path which is waiting for its path to be established
"""
def __init__ (self, path, packet):
"""
xids is a sequence of (dpid,xid)
first_switch is the DPID where the packet came from
packet is something that can be sent in a packet_out
"""
self.expires_at = time.time() + PATH_SETUP_TIME
self.path = path
self.first_switch = path[0][0].dpid
self.xids = set()
self.packet = packet
if len(waiting\_paths) > 1000:
WaitingPath.expire\_waiting\_paths()
def add_xid (self, dpid, xid):
self.xids.add((dpid,xid))
waiting_paths[(dpid,xid)] = self
@property
def is_expired (self):
return time.time() >= self.expires_at
def notify (self, event):
"""
Called when a barrier has been received
"""
self.xids.discard((event.dpid,event.xid))
if len(self.xids) == 0:
# Done!
if self.packet:
log.debug("Sending delayed packet out %s"
% (dpid_to_str(self.first_switch),))
msg = of.ofp_packet_out(data=self.packet,
action=of.ofp_action_output(port=of.OFPP_TABLE))
core.openflow.sendToDPID(self.first_switch, msg)
core.l2\_multi.raiseEvent(PathInstalled(self.path))
@staticmethod
def expire_waiting_paths ():
packets = set(waiting_paths.values())
killed = 0
for p in packets:
if p.is_expired:
killed += 1
for entry in p.xids:
waiting_paths.pop(entry, None)
if killed:
log.error("%i paths failed to install" % (killed,))
class PathInstalled (Event):
"""
Fired when a path is installed
"""
def __init__ (self, path):
Event.__init__(self)
self.path = path
class Switch (EventMixin):
def __init__ (self):
self.connection = None
self.ports = None
self.dpid = None
self._listeners = None
self._connected_at = None
def __repr__ (self):
return dpid_to_str(self.dpid)
def _install (self, switch, in_port, out_port, match, buf = None):
msg = of.ofp_flow_mod()
msg.match = match
msg.match.in_port = in_port
msg.idle_timeout = FLOW_IDLE_TIMEOUT
msg.hard_timeout = FLOW_HARD_TIMEOUT
msg.actions.append(of.ofp_action_output(port = out_port))
msg.buffer_id = buf
switch.connection.send(msg)
def _install_path (self, p, match, packet_in=None):
wp = WaitingPath(p, packet_in)
for sw,in_port,out_port in p:
self._install(sw, in_port, out_port, match)
msg = of.ofp_barrier_request()
sw.connection.send(msg)
wp.add_xid(sw.dpid,msg.xid)
def install_path (self, dst_sw, last_port, match, event):
"""
Attempts to install a path between this switch and some destination
"""
p = _get_path(self, dst_sw, event.port, last_port)
if p is None:
log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)
import pox.lib.packet as pkt
if (match.dl\_type == pkt.ethernet.IP\_TYPE and
event.parsed.find('ipv4')):
# It's IP -- let's send a destination unreachable
log.debug("Dest unreachable (%s -> %s)",
match.dl\_src, match.dl\_dst)
from pox.lib.addresses import EthAddr
e = pkt.ethernet()
e.src = EthAddr(dpid\_to\_str(self.dpid)) #FIXME: Hmm...
e.dst = match.dl\_src
e.type = e.IP\_TYPE
ipp = pkt.ipv4()
ipp.protocol = ipp.ICMP\_PROTOCOL
ipp.srcip = match.nw\_dst #FIXME: Ridiculous
ipp.dstip = match.nw\_src
icmp = pkt.icmp()
icmp.type = pkt.ICMP.TYPE\_DEST\_UNREACH
icmp.code = pkt.ICMP.CODE\_UNREACH\_HOST
orig\_ip = event.parsed.find('ipv4')
d = orig\_ip.pack()
d = d\[:orig\_ip.hl \* 4 + 8\]
import struct
d = struct.pack("!HH", 0,0) + d #FIXME: MTU
icmp.payload = d
ipp.payload = icmp
e.payload = ipp
msg = of.ofp\_packet\_out()
msg.actions.append(of.ofp\_action\_output(port = event.port))
msg.data = e.pack()
self.connection.send(msg)
return
log.debug("Installing path for %s -> %s %04x (%i hops)",
match.dl\_src, match.dl\_dst, match.dl\_type, len(p))
# We have a path -- install it
self.\_install\_path(p, match, event.ofp)
# Now reverse it and install it backwards
# (we'll just assume that will work)
p = \[(sw,out\_port,in\_port) for sw,in\_port,out\_port in p\]
self.\_install\_path(p, match.flip())
def _handle_PacketIn (self, event):
def flood ():
""" Floods the packet """
if self.is_holding_down:
log.warning("Not flooding -- holddown active")
msg = of.ofp_packet_out()
# OFPP_FLOOD is optional; some switches may need OFPP_ALL
msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))
msg.buffer_id = event.ofp.buffer_id
msg.in_port = event.port
self.connection.send(msg)
def drop ():
# Kill the buffer
if event.ofp.buffer\_id is not None:
msg = of.ofp\_packet\_out()
msg.buffer\_id = event.ofp.buffer\_id
event.ofp.buffer\_id = None # Mark is dead
msg.in\_port = event.port
self.connection.send(msg)
packet = event.parsed
loc = (self, event.port) # Place we saw this ethaddr
oldloc = mac\_map.get(packet.src) # Place we last saw this ethaddr
if packet.effective\_ethertype == packet.LLDP\_TYPE:
drop()
return
if oldloc is None:
if packet.src.is\_multicast == False:
mac\_map\[packet.src\] = loc # Learn position for ethaddr
log.debug("Learned %s at %s.%i", packet.src, loc\[0\], loc\[1\])
elif oldloc != loc:
# ethaddr seen at different place!
if core.openflow\_discovery.is\_edge\_port(loc\[0\].dpid, loc\[1\]):
# New place is another "plain" port (probably)
log.debug("%s moved from %s.%i to %s.%i?", packet.src,
dpid\_to\_str(oldloc\[0\].dpid), oldloc\[1\],
dpid\_to\_str( loc\[0\].dpid), loc\[1\])
if packet.src.is\_multicast == False:
mac\_map\[packet.src\] = loc # Learn position for ethaddr
log.debug("Learned %s at %s.%i", packet.src, loc\[0\], loc\[1\])
elif packet.dst.is\_multicast == False:
# New place is a switch-to-switch port!
# Hopefully, this is a packet we're flooding because we didn't
# know the destination, and not because it's somehow not on a
# path that we expect it to be on.
# If spanning\_tree is running, we might check that this port is
# on the spanning tree (it should be).
if packet.dst in mac\_map:
# Unfortunately, we know the destination. It's possible that
# we learned it while it was in flight, but it's also possible
# that something has gone wrong.
log.warning("Packet from %s to known destination %s arrived "
"at %s.%i without flow", packet.src, packet.dst,
dpid\_to\_str(self.dpid), event.port)
if packet.dst.is\_multicast:
log.debug("Flood multicast from %s", packet.src)
flood()
else:
if packet.dst not in mac\_map:
log.debug("%s unknown -- flooding" % (packet.dst,))
flood()
else:
dest = mac\_map\[packet.dst\]
match = of.ofp\_match.from\_packet(packet)
self.install\_path(dest\[0\], dest\[1\], match, event)
def disconnect (self):
if self.connection is not None:
log.debug("Disconnect %s" % (self.connection,))
self.connection.removeListeners(self._listeners)
self.connection = None
self._listeners = None
def connect (self, connection):
if self.dpid is None:
self.dpid = connection.dpid
assert self.dpid == connection.dpid
if self.ports is None:
self.ports = connection.features.ports
self.disconnect()
log.debug("Connect %s" % (connection,))
self.connection = connection
self._listeners = self.listenTo(connection)
self._connected_at = time.time()
@property
def is_holding_down (self):
if self._connected_at is None: return True
if time.time() - self._connected_at > FLOOD_HOLDDOWN:
return False
return True
def _handle_ConnectionDown (self, event):
self.disconnect()
class l2_multi (EventMixin):
_eventMixin_events = set([
PathInstalled,
])
def __init__ (self):
# Listen to dependencies
def startup ():
core.openflow.addListeners(self, priority=0)
core.openflow_discovery.addListeners(self)
core.call_when_ready(startup, ('openflow','openflow_discovery'))
def _handle_LinkEvent (self, event):
def flip (link):
return Discovery.Link(link[2],link[3], link[0],link[1])
l = event.link
sw1 = switches\[l.dpid1\]
sw2 = switches\[l.dpid2\]
# Invalidate all flows and path info.
# For link adds, this makes sure that if a new link leads to an
# improved path, we use it.
# For link removals, this makes sure that we don't use a
# path that may have been broken.
#NOTE: This could be radically improved! (e.g., not \*ALL\* paths break)
clear = of.ofp\_flow\_mod(command=of.OFPFC\_DELETE)
for sw in switches.itervalues():
if sw.connection is None: continue
sw.connection.send(clear)
path\_map.clear()
if event.removed:
# This link no longer okay
if sw2 in adjacency\[sw1\]: del adjacency\[sw1\]\[sw2\]
if sw1 in adjacency\[sw2\]: del adjacency\[sw2\]\[sw1\]
# But maybe there's another way to connect these...
for ll in core.openflow\_discovery.adjacency:
if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:
if flip(ll) in core.openflow\_discovery.adjacency:
# Yup, link goes both ways
adjacency\[sw1\]\[sw2\] = ll.port1
adjacency\[sw2\]\[sw1\] = ll.port2
# Fixed -- new link chosen to connect these
break
else:
# If we already consider these nodes connected, we can
# ignore this link up.
# Otherwise, we might be interested...
if adjacency\[sw1\]\[sw2\] is None:
# These previously weren't connected. If the link
# exists in both directions, we consider them connected now.
if flip(l) in core.openflow\_discovery.adjacency:
# Yup, link goes both ways -- connected!
adjacency\[sw1\]\[sw2\] = l.port1
adjacency\[sw2\]\[sw1\] = l.port2
# If we have learned a MAC on this port which we now know to
# be connected to a switch, unlearn it.
bad\_macs = set()
for mac,(sw,port) in mac\_map.iteritems():
if sw is sw1 and port == l.port1: bad\_macs.add(mac)
if sw is sw2 and port == l.port2: bad\_macs.add(mac)
for mac in bad\_macs:
log.debug("Unlearned %s", mac)
del mac\_map\[mac\]
def _handle_ConnectionUp (self, event):
sw = switches.get(event.dpid)
if sw is None:
# New switch
sw = Switch()
switches[event.dpid] = sw
sw.connect(event.connection)
else:
sw.connect(event.connection)
def _handle_BarrierIn (self, event):
wp = waiting_paths.pop((event.dpid,event.xid), None)
if not wp:
#log.info("No waiting packet %s,%s", event.dpid, event.xid)
return
#log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)
wp.notify(event)
def launch ():
core.registerNew(l2_multi)
timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)
Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章