如何从sql查询中提取表名和列名?

2024-10-01 07:10:34 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我们有这样一个简单的查询:

Select a.col1, b.col2 from tb1 as a inner join tb2 as b on tb1.col7 = tb2.col8;

结果应该是这样的:

tb1 col1
tb1 col7
tb2 col2
tb2 col8

我试着用一些python库来解决这个问题:

1)即使只使用sqlparse提取表也可能是个大问题。例如this官方书籍根本不能正常工作。

2)使用正则表达式似乎很难实现。

3)但后来我发现了this,这可能会有帮助。但是问题是我无法连接到任何数据库并执行该查询。

有什么想法吗?


Tags: from官方onasthisselectcol2col1
2条回答

我正在处理一个类似的问题,并找到了一个更简单的解决方案,它似乎工作得很好。

import re

def tables_in_query(sql_str):

    # remove the /* */ comments
    q = re.sub(r"/\*[^*]*\*+(?:[^*/][^*]*\*+)*/", "", sql_str)

    # remove whole line -- and # comments
    lines = [line for line in q.splitlines() if not re.match("^\s*(--|#)", line)]

    # remove trailing -- and # comments
    q = " ".join([re.split("--|#", line)[0] for line in lines])

    # split on blanks, parens and semicolons
    tokens = re.split(r"[\s)(;]+", q)

    # scan the tokens. if we see a FROM or JOIN, we set the get_next
    # flag, and grab the next one (unless it's SELECT).

    table = set()
    get_next = False
    for tok in tokens:
        if get_next:
            if tok.lower() not in ["", "select"]:
                table.add(tok)
            get_next = False
        get_next = tok.lower() in ["from", "join"]

    dictTables = dict()
    for table in tables:
        fields = []
        for token in tokens:
            if token.startswith(table):
                if token != table:
                    fields.append(token)
        if len(list(set(fields))) >= 1:
        dictTables[table] = list(set(fields))
    return dictTables

代码改编自https://grisha.org/blog/2016/11/14/table-names-from-sql/

真的,这不是一件容易的事。您可以使用lexer(ply在本例中)并定义几个规则,以从字符串中获取多个标记。下面的代码为SQL字符串的不同部分定义这些规则,并将它们放回一起,因为输入字符串中可能有别名。结果,您将得到一个以不同表名作为键的字典(result)。

import ply.lex as lex, re

tokens = (
    "TABLE",
    "JOIN",
    "COLUMN",
    "TRASH"
)

tables = {"tables": {}, "alias": {}}
columns = []

t_TRASH = r"Select|on|=|;|\s+|,|\t|\r"

def t_TABLE(t):
    r"from\s(\w+)\sas\s(\w+)"

    regex = re.compile(t_TABLE.__doc__)
    m = regex.search(t.value)
    if m is not None:
        tbl = m.group(1)
        alias = m.group(2)
        tables["tables"][tbl] = ""
        tables["alias"][alias] = tbl

    return t

def t_JOIN(t):
    r"inner\s+join\s+(\w+)\s+as\s+(\w+)"

    regex = re.compile(t_JOIN.__doc__)
    m = regex.search(t.value)
    if m is not None:
        tbl = m.group(1)
        alias = m.group(2)
        tables["tables"][tbl] = ""
        tables["alias"][alias] = tbl
    return t

def t_COLUMN(t):
    r"(\w+\.\w+)"

    regex = re.compile(t_COLUMN.__doc__)
    m = regex.search(t.value)
    if m is not None:
        t.value = m.group(1)
        columns.append(t.value)
    return t

def t_error(t):
    raise TypeError("Unknown text '%s'" % (t.value,))
    t.lexer.skip(len(t.value))

# here is where the magic starts
def mylex(inp):
    lexer = lex.lex()
    lexer.input(inp)

    for token in lexer:
        pass

    result = {}
    for col in columns:
        tbl, c = col.split('.')
        if tbl in tables["alias"].keys():
            key = tables["alias"][tbl]
        else:
            key = tbl

        if key in result:
            result[key].append(c)
        else:
            result[key] = list()
            result[key].append(c)

    print result
    # {'tb1': ['col1', 'col7'], 'tb2': ['col2', 'col8']}    

string = "Select a.col1, b.col2 from tb1 as a inner join tb2 as b on tb1.col7 = tb2.col8;"
mylex(string)

相关问题 更多 >