p4学习-6:实现网络测量

实验目标

这个练习的目标是写一个P4程序允许一个主机区检测网络中所有链路的利用情况。这个练习是在基础的IPV4 forwarding练习上搭建的。具体来说,我们将修改基本的P4程序,以处理源路由探测包,使其能够在每一跳提取出口链路利用率,并将其交付给主机进行监控。

探测包由下面三种header types组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Top-level probe header, indicates how many hops this probe
// packet has traversed so far.
//顶层的探测header,指出这个探测包经过了多少跳
header probe_t {
bit<8> hop_cnt;
}

// The data added to the probe by each switch at each hop.
//每一个交换机加到探测包上面的数据
header probe_data_t {
bit<1> bos;//bottom of stack
bit<7> swid;//switch ID
bit<8> port;
bit<32> byte_cnt;//和下面的寄存器应该对应
time_t last_time;//和下面的寄存器应该对应
time_t cur_time;
}

// Indicates the egress port the switch should send this probe
// packet out of. There is one of these headers for each hop.
//指示交换机应该发送该探测报文的出口端口。每个跳跃都有一个这样的 header
header probe_fwd_t {
bit<8> egress_spec;
}
topology
topology

拓扑如上,包含了四个主机连接到四个交换机上面,连接方式好像他们在fat tree 的pod上一样。

为了监控链路利用率,交换机将维持两个寄存器数组:

  • byte_cnt_reg 自最后一个探测包从端口传输出去以来,每个端口传输出去的字节数。
  • last_time_reg保存探测包最后一次从每个端口发送出去的时间。

P4程序将被写成V1Model形式(bmv2交换机),V1model可以参考它的官方源码

补充: FatTree胖树拓扑结构

传统结构:

image-20210420125915925
image-20210420125915925

传统数据中心采用多层级的树形结构,这种结构针对客户端/服务器(C/S)模式能有较好的效果。树形结构包括单根树和多根树。多根数的根节点往往作为备份节点存在(我们以方格代表交换机)

缺点:传统单根/多根拓扑结构有以下缺点:成本高,根部交换机必须要有足够大的带宽来满足下层服务器之间的通信;性能瓶颈,无法满足数据中心内部大规模的MapReduce和数据拷贝。

FatTree 拓扑结构:

Fat-Tree是以交换机为中心的拓扑。支持在横向拓展的同时拓展路径数目;且所有交换机均为相同端口数量的普通设备,降低了网络建设成本。

整个拓扑网络分为三个层次:自上而下分别为边缘层(edge)、汇聚层(aggregate)和核心层(core),其中汇聚层交换机与边缘层交换机构成一个pod,交换设备均采用商用交换设备。

一个k元的Fat-Tree可以归纳为5个特征:

  • 每台交换机都有k个端口;
  • 核心层为顶层,一共有(k/2)^2个交换机;
  • 一共有k个pod,每个pod有k台交换机组成。其中汇聚层和接入层各占k/2台交换机;
  • 接入层每个交换机可以容纳k/2台服务器,因此,k元Fat-Tree一共有k个pod,每个pod容纳$kk/4$个服务器,所有pod共能容纳$kk*k/4$台服务器;任意
  • 两个pod之间存在k条路径。

例子:

二叉FatTree

二叉FatTree
二叉FatTree

四叉FatTree

四叉FatTree
四叉FatTree

因此上面练习的拓扑就像四叉FatTree的一个pod

代码实现

headers部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/* -*- P4_16 -*- */
#include <core.p4>
#include <v1model.p4>

const bit<16> TYPE_IPV4 = 0x800;
const bit<16> TYPE_PROBE = 0x812;

#define MAX_HOPS 10
#define MAX_PORTS 8

/*************************************************************************
*********************** H E A D E R S ***********************************
*************************************************************************/

typedef bit<9> egressSpec_t;
typedef bit<48> macAddr_t;
typedef bit<32> ip4Addr_t;

typedef bit<48> time_t;

header ethernet_t {
macAddr_t dstAddr;
macAddr_t srcAddr;
bit<16> etherType;
}

header ipv4_t {
bit<4> version;
bit<4> ihl;
bit<8> diffserv;
bit<16> totalLen;
bit<16> identification;
bit<3> flags;
bit<13> fragOffset;
bit<8> ttl;
bit<8> protocol;
bit<16> hdrChecksum;
ip4Addr_t srcAddr;
ip4Addr_t dstAddr;
}

// Top-level probe header, indicates how many hops this probe
// packet has traversed so far.
header probe_t {
bit<8> hop_cnt;
}

// The data added to the probe by each switch at each hop.
header probe_data_t {
bit<1> bos;
bit<7> swid;
bit<8> port;
bit<32> byte_cnt;
time_t last_time;
time_t cur_time;
}

// Indicates the egress port the switch should send this probe
// packet out of. There is one of these headers for each hop.
header probe_fwd_t {
bit<8> egress_spec;
}

struct parser_metadata_t {
bit<8> remaining;
}

struct metadata {
bit<8> egress_spec;
parser_metadata_t parser_metadata;
}

struct headers {
ethernet_t ethernet;
ipv4_t ipv4;
probe_t probe;
probe_data_t[MAX_HOPS] probe_data;
probe_fwd_t[MAX_HOPS] probe_fwd;
}

比之前的IPV4 forwarding多了三个探测包的header

Parser部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

/*************************************************************************
*********************** P A R S E R ***********************************
*************************************************************************/

parser MyParser(packet_in packet,
out headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {

state start {
transition parse_ethernet;
}

state parse_ethernet {
packet.extract(hdr.ethernet);
transition select(hdr.ethernet.etherType) {
TYPE_IPV4: parse_ipv4;
TYPE_PROBE: parse_probe;
default: accept;
}
}

state parse_ipv4 {
packet.extract(hdr.ipv4);
transition accept;
}

state parse_probe {
packet.extract(hdr.probe);
meta.parser_metadata.remaining = hdr.probe.hop_cnt + 1;
transition select(hdr.probe.hop_cnt) {
0: parse_probe_fwd;
default: parse_probe_data;
}
}

state parse_probe_data {
packet.extract(hdr.probe_data.next);
transition select(hdr.probe_data.last.bos) {
1: parse_probe_fwd;
default: parse_probe_data;
}
}

state parse_probe_fwd {
packet.extract(hdr.probe_fwd.next);
meta.parser_metadata.remaining = meta.parser_metadata.remaining - 1;
// extract the forwarding data
meta.egress_spec = hdr.probe_fwd.last.egress_spec;
transition select(meta.parser_metadata.remaining) {
0: accept;
default: parse_probe_fwd;
}
}
}

流程是:先进入start状态,通过ethernet的etherType确定是 ipv4包还是探测包,如果是ipv4包,略;如果是探测包,那么看看是不是第一个跳,探测包经过第一跳的时候不会有其他信息,因此可以直接进入向前转发的状态;否则就更新matada里面的remaing,进入parse_probe_data状态解析probe_data.next的信息,知道解析到last.bos也就是栈底的时候再进入向前转发状态;向前转发状态(parse_probe_fwd)里面,使用hdr.probe.hop_cnt指出了哪一个egress_spec来处理向前转发,并且把这个端口号存在一个metadata的域里面;

Ingress Control 部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/*************************************************************************
************ C H E C K S U M V E R I F I C A T I O N *************
*************************************************************************/

control MyVerifyChecksum(inout headers hdr, inout metadata meta) {
apply { }
}


/*************************************************************************
************** I N G R E S S P R O C E S S I N G *******************
*************************************************************************/

control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {

action drop() {
mark_to_drop(standard_metadata);
}

action ipv4_forward(macAddr_t dstAddr, egressSpec_t port) {
standard_metadata.egress_spec = port;
hdr.ethernet.srcAddr = hdr.ethernet.dstAddr;
hdr.ethernet.dstAddr = dstAddr;
hdr.ipv4.ttl = hdr.ipv4.ttl - 1;
}

table ipv4_lpm {
key = {
hdr.ipv4.dstAddr: lpm;
}
actions = {
ipv4_forward;
drop;
NoAction;
}
size = 1024;
default_action = drop();
}

apply {
if (hdr.ipv4.isValid()) {
ipv4_lpm.apply();
}
else if (hdr.probe.isValid()) {
standard_metadata.egress_spec = (bit<9>)meta.egress_spec;
hdr.probe.hop_cnt = hdr.probe.hop_cnt + 1;
}
}
}

比basic那个实验多了一个如果hdr.probe.isValid()(也就是这个包是探测包),就记录 egress_spec并且更新hdr.probe.hop_cnt

Egress Control 部分

这部分是状态处理发生的地方,使用byte_cnt_regs寄存器在计算自最后一个探测包通过该端口以来通过每个端口的字节数;

这部分增加了一个新的probe_data ,并且填写了 bos (bottom of stack) 和 swid (switch ID);

要做的部分是填写探测包字段的其余部分,以确保您可以正确地测量链路利用率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

/*************************************************************************
**************** E G R E S S P R O C E S S I N G ********************
*************************************************************************/

control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {

// count the number of bytes seen since the last probe
register<bit<32>>(MAX_PORTS) byte_cnt_reg;
// remember the time of the last probe
register<time_t>(MAX_PORTS) last_time_reg;

action set_swid(bit<7> swid) {
hdr.probe_data[0].swid = swid;
}

table swid {
actions = {
set_swid;
NoAction;
}
default_action = NoAction();
}

apply {
bit<32> byte_cnt;
bit<32> new_byte_cnt;
time_t last_time;
time_t cur_time = standard_metadata.egress_global_timestamp;
// increment byte cnt for this packet's port
byte_cnt_reg.read(byte_cnt, (bit<32>)standard_metadata.egress_port);
byte_cnt = byte_cnt + standard_metadata.packet_length;
// reset the byte count when a probe packet passes through
new_byte_cnt = (hdr.probe.isValid()) ? 0 : byte_cnt;
byte_cnt_reg.write((bit<32>)standard_metadata.egress_port, new_byte_cnt);

if (hdr.probe.isValid()) {
// fill out probe fields
hdr.probe_data.push_front(1);
hdr.probe_data[0].setValid();
if (hdr.probe.hop_cnt == 1) {
hdr.probe_data[0].bos = 1;
}
else {
hdr.probe_data[0].bos = 0;
}
// set switch ID field
swid.apply();
// TODO: fill out the rest of the probe packet fields
// hdr.probe_data[0].port = ...
// hdr.probe_data[0].byte_cnt = ...
// TODO: read / update the last_time_reg
// last_time_reg.read(<val>, <index>);
// last_time_reg.write(<index>, <val>);
// hdr.probe_data[0].last_time = ...
// hdr.probe_data[0].cur_time = ...
}
}
}

解答:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

/*************************************************************************
**************** E G R E S S P R O C E S S I N G ********************
*************************************************************************/

control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {

// count the number of bytes seen since the last probe
register<bit<32>>(MAX_PORTS) byte_cnt_reg;
// remember the time of the last probe
register<time_t>(MAX_PORTS) last_time_reg;

action set_swid(bit<7> swid) {
hdr.probe_data[0].swid = swid;
}

table swid {
actions = {
set_swid;
NoAction;
}
default_action = NoAction();
}

apply {
bit<32> byte_cnt;
bit<32> new_byte_cnt;
time_t last_time;
time_t cur_time = standard_metadata.egress_global_timestamp;
// increment byte cnt for this packet's port
byte_cnt_reg.read(byte_cnt, (bit<32>)standard_metadata.egress_port);
byte_cnt = byte_cnt + standard_metadata.packet_length;
// reset the byte count when a probe packet passes through
new_byte_cnt = (hdr.probe.isValid()) ? 0 : byte_cnt;
byte_cnt_reg.write((bit<32>)standard_metadata.egress_port, new_byte_cnt);

if (hdr.probe.isValid()) {
// fill out probe fields
hdr.probe_data.push_front(1);
hdr.probe_data[0].setValid();
if (hdr.probe.hop_cnt == 1) {
hdr.probe_data[0].bos = 1;
}
else {
hdr.probe_data[0].bos = 0;
}
// set switch ID field
swid.apply();
hdr.probe_data[0].port = (bit<8>)standard_metadata.egress_port;
hdr.probe_data[0].byte_cnt = byte_cnt;
// read / update the last_time_reg
last_time_reg.read(last_time, (bit<32>)standard_metadata.egress_port);
last_time_reg.write((bit<32>)standard_metadata.egress_port, cur_time);
hdr.probe_data[0].last_time = last_time;
hdr.probe_data[0].cur_time = cur_time;
}
}
}

其余部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

/*************************************************************************
************* C H E C K S U M C O M P U T A T I O N ***************
*************************************************************************/

control MyComputeChecksum(inout headers hdr, inout metadata meta) {
apply {
update_checksum(
hdr.ipv4.isValid(),
{ hdr.ipv4.version,
hdr.ipv4.ihl,
hdr.ipv4.diffserv,
hdr.ipv4.totalLen,
hdr.ipv4.identification,
hdr.ipv4.flags,
hdr.ipv4.fragOffset,
hdr.ipv4.ttl,
hdr.ipv4.protocol,
hdr.ipv4.srcAddr,
hdr.ipv4.dstAddr },
hdr.ipv4.hdrChecksum,
HashAlgorithm.csum16);
}
}

/*************************************************************************
*********************** D E P A R S E R *******************************
*************************************************************************/

control MyDeparser(packet_out packet, in headers hdr) {
apply {
packet.emit(hdr.ethernet);
packet.emit(hdr.ipv4);
packet.emit(hdr.probe);
packet.emit(hdr.probe_data);
packet.emit(hdr.probe_fwd);
}
}

/*************************************************************************
*********************** S W I T C H *******************************
*************************************************************************/

V1Switch(
MyParser(),
MyVerifyChecksum(),
MyIngress(),
MyEgress(),
MyComputeChecksum(),
MyDeparser()
) main;

probe_hdrs.py分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from scapy.all import *

TYPE_PROBE = 0x812

class Probe(Packet):
fields_desc = [ ByteField("hop_cnt", 0)]

class ProbeData(Packet):
fields_desc = [ BitField("bos", 0, 1),
BitField("swid", 0, 7),
ByteField("port", 0),
IntField("byte_cnt", 0),
BitField("last_time", 0, 48),
BitField("cur_time", 0, 48)]

class ProbeFwd(Packet):
fields_desc = [ ByteField("egress_spec", 0)]

bind_layers(Ether, Probe, type=TYPE_PROBE)
bind_layers(Probe, ProbeFwd, hop_cnt=0)
bind_layers(Probe, ProbeData)
bind_layers(ProbeData, ProbeData, bos=0)
bind_layers(ProbeData, ProbeFwd, bos=1)
bind_layers(ProbeFwd, ProbeFwd)

这部分是在控制平面用scapy定义了探测包的结构,从bind_layers可以看出,结构关系大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Ethernet{
.....
Probe{
ProbeFwd{
ProbeFwd{
....
}
}
ProbeData{
ProbeData{
....
}
}
}
}

这样其中ProbeFwd以及ProbeData可以嵌套;具体的结构在class里面进行了定义

send.py分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
import sys
import time
from probe_hdrs import *

def main():

probe_pkt = Ether(dst='ff:ff:ff:ff:ff:ff', src=get_if_hwaddr('eth0')) / \
Probe(hop_cnt=0) / \
ProbeFwd(egress_spec=4) / \
ProbeFwd(egress_spec=1) / \
ProbeFwd(egress_spec=4) / \
ProbeFwd(egress_spec=1) / \
ProbeFwd(egress_spec=3) / \
ProbeFwd(egress_spec=2) / \
ProbeFwd(egress_spec=3) / \
ProbeFwd(egress_spec=2) / \
ProbeFwd(egress_spec=1)# 根据拓扑也就是s1-s1的4端口出到s4,s41端口出到s2……

while True:
try:
sendp(probe_pkt, iface='eth0')# 每隔一秒发一个探测包
time.sleep(1)
except KeyboardInterrupt:
sys.exit()

if __name__ == '__main__':
main()

这部分要结合拓扑图来看:

topology
topology

这部分probe_pkt这个包其实仔细观察不难发现,它里面的ProbeFwd和拓扑图的路线完全一致,假设是s1进行了发包,那么就是从s1-s1的4端口出到s4,s4的1端口出到s2……然后完成了一个回路到了s1;下面的代码只是每隔一秒钟发一下罢了;

receive.py分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python

from probe_hdrs import *

def expand(x):
yield x
while x.payload:
x = x.payload
yield x

def handle_pkt(pkt):
if ProbeData in pkt:
data_layers = [l for l in expand(pkt) if l.name=='ProbeData']
print ""
for sw in data_layers:
utilization = 0 if sw.cur_time == sw.last_time else 8.0*sw.byte_cnt/(sw.cur_time - sw.last_time)
print "Switch {} - Port {}: {} Mbps".format(sw.swid, sw.port, utilization)

def main():
iface = 'eth0'
print "sniffing on {}".format(iface)
sniff(iface = iface,
prn = lambda x: handle_pkt(x)) #prn指定回调函数,每当一个符合filter的报文被探测到时,就会执行回调函数,通常使用lambda表达式来写回调函数

if __name__ == '__main__':
main()

expand 这个就是一个走yield的递归的函数;

handle_pkt这部分首先实会不停的往后整pkt得到ProbeData的部分然后存到data_layer里面,然后解析里面的内容,看utilization,计算公式就是以Bit为单位的数据除以时间;

main还是里面的sniff是一个过滤器,其实是iface为”eth0”的包就扔到handle_pkt里面去处理;

实验过程

  1. 跑:

    1
    make run
    • 编译link_monitor.p4
    • 在mininet里面启动如上面图片的拓扑并且将所有的交换机都设置好p4程序和相应的table entries
    • 根据topology.json设置所有的主机
  2. 使用mininet打开h1端口,开两个

    1
    mininet> xterm h1 h1
  3. 在一个窗口里面跑send.py脚本可以开始每秒发送探测包。探测包的路线和拓扑图一样

    1
    ./send.py
  4. 在另一个窗口跑receive.py可以开始接受并且接受这些探测包

    1
    ./receive.py

    报告的链路利用率和交换机端口号将始终为0,因为探测字段还没有填写。

  5. 在h1和h4之间开一个iperf流

    1
    mininet>iperf h1 h4
  6. 在所有窗口里面exit然后make stop

测量的链路利用率与iperf报告的不一致,因为探测包字段还没有填充。您的目标是填写探测包字段,以便两个测量结果一致。

控制平面的一些说明

P4程序定义了一个包处理管道,但是每个表中的规则是由控制平面插入的。当一个规则匹配一个包时,它的操作将被控制平面作为规则的一部分提供的参数调用。

在这个练习中,我们已经为您实现了控制平面逻辑。作为启动Mininet实例的一部分,make run命令将在每个交换机的表中安装包处理规则。这些是在sX-runtime.json中定义的,其中X 对应开关号。

注意点:我们使用P4Runtime来安装控制平面规则。 sX-runtime.json文件的运行时的内容指的是表、键和动作的特定名称,如编译器生成的P4Info文件中定义的(在执行make run后查找 build/link_monitor.p4.p4info.txt文件)。P4程序中添加或重命名表、键或操作的任何更改都需要反映在这些 sX-runtime.json文件中。