我有两个这样的数据帧
网络
+----------------+-------+
| Network | VLAN |
+----------------+-------+
| 192.168.1.0/24 | VLAN1 |
| 192.168.2.0/24 | VLAN2 |
+----------------+-------+
流动
+--------------+----------------+
| source_ip | destination_ip |
+--------------+----------------+
| 192.168.1.11 | 192.168.2.13 |
+--------------+----------------+
理想情况下,我想得到这样的东西
+--------------+----------------+-------------+------------------+
| source_ip | destination_ip | source_vlan | destination_vlan |
+--------------+----------------+-------------+------------------+
| 192.168.1.11 | 192.168.2.13 | VLAN1 | VLAN2 |
+--------------+----------------+-------------+------------------+
不幸的是,流数据帧不包含子网掩码。到目前为止我在没有pyspark的情况下尝试了什么
ipaddress.ip_network('{}/{}'.format(ip,sub), strict=False)
我试着用pyspark做一个类似的方法,但效果不如我认为有更好的方法吗
def get_available_subnets(df):
split_col = split(df['network'], '/')
df = df.withColumn('sub', split_col.getItem(1))
return df.select('sub').distinct()
def get_vlan_by_ip(ip, infoblox, subnets):
for sub in subnets:
net = ipaddress.ip_network('{}/{}'.format(ip,sub), strict=False)
if net:
search = infoblox.filter(infoblox.network == str(net))
if not search.head(1).isEmpty():
return search.first.vlan
return hashlib.sha1(str.encode(ip)).hexdigest()
subnets = get_available_subnets(infoblox_networks_df).select('sub').rdd.flatMap(lambda x: x).collect()
short = flows_filtered_prepared_df.limit(1000)
partial_vlan_func = partial(get_vlan_by_ip, infoblox=infoblox_networks_df, subnets=subnets)
get_vlan_udf = udf(lambda ip: partial_vlan_func(ip), StringType())
short.select('source_ip', 'destination_ip', get_vlan_udf('source_ip').alias('source_vlan')).show()
这种方法完全避免了使用
udf
,利用了split
和slice
,但也许有更好的方法。这种方法的好处是它直接利用子网掩码中存在的位,并且它完全是在PySpark
中写入的解决方案的上下文:IP地址可以被子网分割和屏蔽。这意味着
8, 16, 24, 32
会告诉您IP的哪些部分是重要的-这会促使除以8,并在IP地址ArrayType
列与其原始StringType
分离后使用结果列对其进行切片注意:
pyspark.sql.functions.slice
将在更新版本的PySpark >= 2.4
中工作,一些旧版本需要使用f.expr("slice(...)")
设置:
一些预处理:
最后,根据我的掩码的
bits
在切片上执行IcrossJoin
和过滤,例如:对于
destination_ip
可以采用完全相同的方法,例如:最后,您可以在
source_ip
和destination_ip
上将生成的两个表连接在一起(因为您根据需要附加了vlan
信息),或者将前面的两个步骤合并在一起,然后crossJoin
和filter
两次希望这有帮助
相关问题 更多 >
编程相关推荐