Hi,
I would like to define a thread pool once my module is activated and any time that I need to send a flowmod
(delete/add/modify) to any dpid I will submit tasks to such pool. The pool will use a thread to send flowmod
to a particular dpid.
With the code snippet below I am able to add a flow (i.e. rules at different switches along a path) using the REST API
proactively with no hassle. However, when the controller receives a PKT_IN and needs to submit more rules (reactively) it
causes a deadlock (I believe) and all datapaths along the path are revoked and existing rules are wiped out.
Here are the relevant parts of the code handling my workflow:
/**
* Threading variables & constants
*/
private DefaultThreadExecutor flowExecutor;
private static final int MIN_THREAD_POOL_SIZE = 5;
private static final int MAX_THREAD_POOL_SIZE = 50;
/* initialize thread pool upon bundle activation */
@Activate
private void activate(){
int totalDpids = cs.getAllDataPathInfo().size();
int n = totalDpids > MIN_THREAD_POOL_SIZE ? totalDpids : MIN_THREAD_POOL_SIZE;
this.flowExecutor = new DefaultThreadExecutor("FlowManager", n, MAX_THREAD_POOL_SIZE, n);
}
/*Tasks that will be submitted to the thread pool */
private class AddConfirmedRuleTask implements Callable<Boolean>{
DataPathId dp;
long cookie;
int hardtimeout;
TableId table;
int priority;
EnumSet<FlowModFlag> flags;
Match match;
Instruction instr;
public AddConfirmedRuleTask(DataPathId dp, ..., Instruction instr) {
super();
this.dp = dp;
this.cookie = cookie;
/* remaining attributes... */
this.instr = instr;
}
/* Send confirmed flow mod to datapath and return true if it succeeded, false otherwise */
public Boolean call() {
OfmMutableFlowMod flowmod = (OfmMutableFlowMod) MessageFactory.create(PV, MessageType.FLOW_MOD,
FlowModCommand.ADD);
flowmod.bufferId(BufferId.NO_BUFFER)
.hardTimeout(this.hardtimeout)
.cookie(this.cookie)
...
.addInstruction(this.instr);
try {
MessageFuture mf = cs.sendConfirmedFlowMod((OfmFlowMod) flowmod.toImmutable(), this.dp.dpid(),
FlowClass.UNSPECIFIED);
mf.await();
return mf.result().isSuccess();
} catch (OpenflowException | InterruptedException e) {
log.info("There was an internal error with flow mod -> {}", this.dp.dpid());
return false;
}
}
}
/*Given a flow path build the rules and submit each task to the thread pool*/
private void pushFlow(FlowPath p) throws OpenflowException{
Collection<AddConfirmedRuleTask> rules = new ArrayList<AddConfirmedRuleTask>();
for (DataPathId dp : p.dpids()){
/* code defining table, match, instructions, etc */
rules.add(new AddConfirmedRuleTask(dp, p.id(), ..., (Instruction) actions.toImmutable()));
}
try {
for (AddConfirmedRuleTask rule: rules){
Future<Boolean> reply = this.flowExecutor.submit(rule);
if (!reply.get()) throw new CreateException("At least one rule along the path couldn't be added");
}
} catch (InterruptedException | ExecutionException e) {
log.info("Thread error while adding flow: {}: {}", e.getClass().getName(), e.getMessage());
}
}
/**
* Packet listener used to refine flows. (i.e. add tcp_src field)
*/
private class PacketListener implements SequencedPacketListener {
@Override
public void event(MessageContext ctx) {
long cookie = ctx.getPacketIn().getCookie();
FlowPath p;
synchronized (cache) {
p = cache.get(cookie);
}
// Do not process subsequent packets of refinable flows sent to controller
if (p.isRefined()) return;
log.info("Flow found! Refining started...");
Ip ip = ctx.decodedPacket().get(ProtocolId.IP);
if (ip.type().equals(IpType.TCP)){
Tcp tcp = ctx.decodedPacket().get(ProtocolId.TCP);
p.setSrcport(PortNumber.valueOf(tcp.srcPort().getNumber()));
try {
pushFlow(f);
} catch (OpenflowException | RuntimeException e) {
throw new CreateException(e.getMessage());
}
ctx.packetOut().block();
}
}
The ultimate purpose with the PKT_IN is to get the TCP_SRC field of a flow such that I can install a rule with higher priority
that will match the complete 5-tuple (srcip, dstip, srcport, dstport, protocol).
Here's the log output. As you can tell, when adding a flow through rest api (Generic flow) everything works smoothly. But
when trying to do the same operation in a reactive way (refining a flow), it hangs while trying to add the rule at the first hop of
the path.
INFO http-bio-8443-exec-46 com.netlab.viplanes.impl.FlowPathsManager Adding GENERIC flow with cookie: ab01400000000085
INFO FlowManager-0 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:15:10:60:4b:b6:46:80 problem (none)
INFO FlowManager-1 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:15:10:60:4b:b6:46:80 problem (none)
INFO FlowManager-2 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:15:40:a8:f0:9b:13:00 problem (none)
INFO FlowManager-0 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:16:10:60:4b:b6:46:80 problem (none)
INFO http-bio-8443-exec-50 com.netlab.viplanes.impl.FlowPathManager Adding GENERIC flow with cookie: ab01c00000000085
INFO FlowManager-1 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:16:10:60:4b:b6:46:80 problem (none)
INFO FlowManager-2 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:16:10:60:4b:b6:46:80 problem (none)
INFO FlowManager-0 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:15:40:a8:f0:9b:13:00 problem (none)
INFO FlowManager-1 com.netlab.viplanes.impl.FlowPathManager Mark 2: Result: SUCCESS at 00:15:10:60:4b:b6:46:80 problem (none)
WARN of-pend-future-timer hp.of.ctl Aged-out futures: 8 (sweep run every 120000ms)
INFO of-io-27-thread-16 com.netlab.viplanes.impl.FlowPathManager Flow found! Refining started...
INFO of-io-27-thread-16 com.netlab.viplanes.impl.FlowPathManager Adding REFINED flow with cookie: ab01c00000000085
INFO of-io-27-thread-5 com.netlab.viplanes.impl.FlowPathManager Flow found! Refining started...
INFO of-io-27-thread-5 com.netlab.viplanes.impl.FlowPathManager Adding REFINED flow with cookie: ab01400000000085
(after 10 secs approx...)
DEBUG of-io-27-thread-6 hp.of.ctl Received connection from {OfConn:172.24.240.37}
DEBUG of-io-27-thread-6 hp.of.ctl Disconnected {OfConn:172.24.240.37}
WARN of-io-27-thread-6 hp.of.ctl Datapath REVOKED: 00:15:10:60:4b:b6:46:80, neg=V_1_3, ip=172.24.240.37
ERROR of-io-27-thread-6 hp.of.ctl Intercepted unexpected exception: java.lang.IllegalStateException: Main connection already established for dpid 00:15:10:60:4b:b6:46:80
DEBUG of-io-27-thread-8 hp.of.ctl Received connection from {OfConn:172.24.240.37}
DEBUG of-io-27-thread-8 hp.of.ctl Disconnected {OfConn:172.24.240.37}
WARN of-io-27-thread-8 hp.of.ctl Datapath REVOKED: 00:15:10:60:4b:b6:46:80, neg=V_1_3, ip=172.24.240.37
ERROR of-io-27-thread-8 hp.of.ctl Intercepted unexpected exception: java.lang.IllegalStateException: Main connection already established for dpid 00:15:10:60:4b:b6:46:80
DEBUG of-io-27-thread-9 hp.of.ctl Received connection from {OfConn:172.24.240.37}
DEBUG of-io-27-thread-9 hp.of.ctl Disconnected {OfConn:172.24.240.37}
WARN of-io-27-thread-9 hp.of.ctl Datapath REVOKED: 00:16:10:60:4b:b6:46:80, neg=V_1_3, ip=172.24.240.37
ERROR of-io-27-thread-9 hp.of.ctl Intercepted unexpected exception: java.lang.IllegalStateException: Main connection already established for dpid 00:16:10:60:4b:b6:46:80
DEBUG of-io-27-thread-10 hp.of.ctl Received connection from {OfConn:172.24.240.37}
DEBUG of-io-27-thread-10 hp.of.ctl Disconnected {OfConn:172.24.240.37}
WARN of-io-27-thread-10 hp.of.ctl Datapath REVOKED: 00:15:10:60:4b:b6:46:80, neg=V_1_3, ip=172.24.240.37
ERROR of-io-27-thread-10 hp.of.ctl Intercepted unexpected exception: java.lang.IllegalStateException: Main connection already established for dpid 00:15:10:60:4b:b6:46:80
WARN of-idle-timer hp.of.ctl Closing unresponsive connection from 172.24.240.37
DEBUG of-idle-timer hp.of.ctl Disconnected {OfConn:172.24.240.37}
DEBUG of-idle-timer hp.of.ctl Datapath connection closed: 00:16:10:60:4b:b6:46:80 aux:0
INFO of-idle-timer hp.of.ctl Datapath removed: 00:16:10:60:4b:b6:46:80, neg=V_1_3, ip=172.24.240.37
DEBUG DpQPool-8-thread-3 hp.sdn.dd DE0005I Disconnected event for dpid 00:16:10:60:4b:b6:46:80
Any advice on how to use confirmedflowmods in a reactive manner using thread pool?
PS. This approach "works" when I use sendFlowMod instead of sendConfirmedFlowMod but I do need to check if a rule was
successfully installed or not.
Thanks!