基于函数的Pyspark连接数据帧

2024-06-02 10:06:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个这样的数据帧

网络

+----------------+-------+
|    Network     | VLAN  |
+----------------+-------+
| 192.168.1.0/24 | VLAN1 |
| 192.168.2.0/24 | VLAN2 |
+----------------+-------+

流动

+--------------+----------------+
|  source_ip   | destination_ip |
+--------------+----------------+
| 192.168.1.11 | 192.168.2.13   |
+--------------+----------------+

理想情况下,我想得到这样的东西

+--------------+----------------+-------------+------------------+
|  source_ip   | destination_ip | source_vlan | destination_vlan |
+--------------+----------------+-------------+------------------+
| 192.168.1.11 | 192.168.2.13   | VLAN1       | VLAN2            |
+--------------+----------------+-------------+------------------+

不幸的是,流数据帧不包含子网掩码。到目前为止我在没有pyspark的情况下尝试了什么

  1. 获取不同的子网列表(在本例中为[24])
  2. 对于每个子网和源ip计算网络ipaddress.ip_network('{}/{}'.format(ip,sub), strict=False)
  3. 搜索数据帧“网络”中是否存在该子网,并返回VLAN,否则返回空字符串

我试着用pyspark做一个类似的方法,但效果不如我认为有更好的方法吗

def get_available_subnets(df):
    split_col = split(df['network'], '/')
    df = df.withColumn('sub', split_col.getItem(1))
    return df.select('sub').distinct()

def get_vlan_by_ip(ip, infoblox, subnets):
    for sub in subnets:
        net = ipaddress.ip_network('{}/{}'.format(ip,sub), strict=False)
        if net:
            search = infoblox.filter(infoblox.network == str(net))

            if not search.head(1).isEmpty():
                return search.first.vlan
    return hashlib.sha1(str.encode(ip)).hexdigest()

subnets = get_available_subnets(infoblox_networks_df).select('sub').rdd.flatMap(lambda x: x).collect()


short = flows_filtered_prepared_df.limit(1000)


partial_vlan_func = partial(get_vlan_by_ip, infoblox=infoblox_networks_df, subnets=subnets)
get_vlan_udf = udf(lambda ip: partial_vlan_func(ip), StringType())

short.select('source_ip', 'destination_ip', get_vlan_udf('source_ip').alias('source_vlan')).show()

Tags: 数据ip网络sourcedfgetreturnnetwork
1条回答
网友
1楼 · 发布于 2024-06-02 10:06:24

这种方法完全避免了使用udf,利用了splitslice,但也许有更好的方法。这种方法的好处是它直接利用子网掩码中存在的位,并且它完全是在PySpark中写入的

解决方案的上下文:IP地址可以被子网分割和屏蔽。这意味着8, 16, 24, 32会告诉您IP的哪些部分是重要的-这会促使除以8,并在IP地址ArrayType列与其原始StringType分离后使用结果列对其进行切片

注意:pyspark.sql.functions.slice将在更新版本的PySpark >= 2.4中工作,一些旧版本需要使用f.expr("slice(...)")

设置:

flows = spark.createDataFrame([
    (1, "192.168.1.1", "192.168.2.1"),
    (2, "192.168.2.1", "192.168.3.1"), 
    (3, "192.168.3.1", "192.168.1.1"), 
], ['id', 'source_ip', 'destination_ip'] 
)
networks = spark.createDataFrame([
    (1, "192.168.1.0/24", "VLAN1"),
    (2, "192.168.2.0/24", "VLAN2"), 
    (3, "192.168.3.0/24", "VLAN3"), 
], ['id', 'network', 'vlan'] 
)

一些预处理:

networks_split = networks.select(
    "*",
    (f.split(f.col("network"), "/")[1] / 8).cast("int").alias("bits"),
    f.split(f.split(f.col("network"), "/")[0], "\.").alias('segmented_ip')
)
networks_split.show()
+ -+       +  -+  +        +
| id|       network| vlan|bits|    segmented_ip|
+ -+       +  -+  +        +
|  1|192.168.1.0/24|VLAN1|   3|[192, 168, 1, 0]|
|  2|192.168.2.0/24|VLAN2|   3|[192, 168, 2, 0]|
|  3|192.168.3.0/24|VLAN3|   3|[192, 168, 3, 0]|
+ -+       +  -+  +        +

networks_masked = networks_split.select(
    "*",
    f.expr("slice(segmented_ip, 1, bits)").alias("masked_bits"),
)
networks_masked.show()
+ -+       +  -+  +        +      -+
| id|       network| vlan|bits|    segmented_ip|  masked_bits|
+ -+       +  -+  +        +      -+
|  1|192.168.1.0/24|VLAN1|   3|[192, 168, 1, 0]|[192, 168, 1]|
|  2|192.168.2.0/24|VLAN2|   3|[192, 168, 2, 0]|[192, 168, 2]|
|  3|192.168.3.0/24|VLAN3|   3|[192, 168, 3, 0]|[192, 168, 3]|
+ -+       +  -+  +        +      -+

flows_split = flows.select(
    "*",
    f.split(f.split(f.col("source_ip"), "/")[0], "\.").alias('segmented_source_ip'),
    f.split(f.split(f.col("destination_ip"), "/")[0], "\.").alias('segmented_destination_ip')
)
flows_split.show()
+ -+     -+       +         -+            +
| id|  source_ip|destination_ip|segmented_source_ip|segmented_destination_ip|
+ -+     -+       +         -+            +
|  1|192.168.1.1|   192.168.2.1|   [192, 168, 1, 1]|        [192, 168, 2, 1]|
|  2|192.168.2.1|   192.168.3.1|   [192, 168, 2, 1]|        [192, 168, 3, 1]|
|  3|192.168.3.1|   192.168.1.1|   [192, 168, 3, 1]|        [192, 168, 1, 1]|
+ -+     -+       +         -+            +

最后,根据我的掩码的bits在切片上执行IcrossJoin和过滤,例如:

flows_split.crossJoin(
    networks_masked.select("vlan", "bits", "masked_bits")
).where(
    f.expr("slice(segmented_source_ip, 1, bits)") == f.col("masked_bits")
).show()
+ -+     -+       +         -+            +  -+  +      -+
| id|  source_ip|destination_ip|segmented_source_ip|segmented_destination_ip| vlan|bits|  masked_bits|
+ -+     -+       +         -+            +  -+  +      -+
|  1|192.168.1.1|   192.168.2.1|   [192, 168, 1, 1]|        [192, 168, 2, 1]|VLAN1|   3|[192, 168, 1]|
|  2|192.168.2.1|   192.168.3.1|   [192, 168, 2, 1]|        [192, 168, 3, 1]|VLAN2|   3|[192, 168, 2]|
|  3|192.168.3.1|   192.168.1.1|   [192, 168, 3, 1]|        [192, 168, 1, 1]|VLAN3|   3|[192, 168, 3]|
+ -+     -+       +         -+            +  -+  +      -+

对于destination_ip可以采用完全相同的方法,例如:

flows_split.crossJoin(
    networks_masked.select("vlan", "bits", "masked_bits")
).where(
    f.expr("slice(segmented_destination_ip, 1, bits)") == f.col("masked_bits")
).show()
+ -+     -+       +         -+            +  -+  +      -+
| id|  source_ip|destination_ip|segmented_source_ip|segmented_destination_ip| vlan|bits|  masked_bits|
+ -+     -+       +         -+            +  -+  +      -+
|  1|192.168.1.1|   192.168.2.1|   [192, 168, 1, 1]|        [192, 168, 2, 1]|VLAN2|   3|[192, 168, 2]|
|  2|192.168.2.1|   192.168.3.1|   [192, 168, 2, 1]|        [192, 168, 3, 1]|VLAN3|   3|[192, 168, 3]|
|  3|192.168.3.1|   192.168.1.1|   [192, 168, 3, 1]|        [192, 168, 1, 1]|VLAN1|   3|[192, 168, 1]|
+ -+     -+       +         -+            +  -+  +      -+

最后,您可以在source_ipdestination_ip上将生成的两个表连接在一起(因为您根据需要附加了vlan信息),或者将前面的两个步骤合并在一起,然后crossJoinfilter两次

希望这有帮助

相关问题 更多 >