将PySpark RDD映射两次以按不同键分组

2024-09-25 16:29:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下几点RDD

timeRange = (access_logs
              .map(lambda log: (log.date_time, 1))
              .reduceByKey(lambda a, b : a + b)
              .map(lambda s: s)
              .take(2000))

print("IpAddresses by time range: {}".format(timeRange))

我的模式如下所示:

def parse_apache_log_line(logline):
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        raise Exception("Invalid logline: %s" % logline)
    return Row(
        ip_address    = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = match.group(4),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = int(match.group(9))
    )

示例日志文件:

129.192.176.24 - - [25/May/2015:23:11:16 +0000] "GET / HTTP/1.0" 200 3557 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; snprtz|S26320700000083|2600#Service Pack 1#2#5#154321|isdn)"

我想按时间戳分组和显示,然后在时间戳内按IP地址及其计数分组和显示。 现在我可以通过IP地址进行映射并获得类似('25/May/2015:23:11:15 +0000', 1995)的内容,但我正在寻找类似的内容: ('25/May/2015:23:11:15 +0000', ('1.2.3.4', 20))


Tags: lambdalogmap内容dateaccesstimematch
1条回答
网友
1楼 · 发布于 2024-09-25 16:29:00

您只需在第一步中按(date_time, ip_address)键进行缩减,然后按date_time分组即可

试试这个:

timeRange = (access_logs
             .map(lambda log: ((log.date_time, log.ip_address), 1))
             .reduceByKey(lambda a, b: a + b)
             .map(lambda x: (x[0][0], (x[0][1], x[1]))) # <=> (date_time, (ip_address, count))
             .groupByKey()
             .map(lambda x: (x[0], list(x[1]))) # this final step to get list as groupBy gives ResultIterable object
            )

相关问题 更多 >