codernitydb 使用字符串值的索引

1 投票
2 回答
1588 浏览
提问于 2025-04-18 02:22

我正在使用CodernityDB这个数据库,编程语言是Python。

我想创建一个表格,其中有一个索引是字符串类型,而不是整数类型。

我该怎么做呢?

这是我正在参考的教程:

#!/usr/bin/env python
from CodernityDB.database import Database
from CodernityDB.hash_index import HashIndex


class WithXIndex(HashIndex):

    def __init__(self, *args, **kwargs):
        kwargs['key_format'] = 'I'
        super(WithXIndex, self).__init__(*args, **kwargs)

    def make_key_value(self, data):
        a_val = data.get("x")
        if a_val is not None:
            return a_val, None
        return None

    def make_key(self, key):
        return key


def main():
    db = Database('/tmp/tut2')
    db.create()
    x_ind = WithXIndex(db.path, 'x')
    db.add_index(x_ind)

    for x in xrange(100):
        db.insert(dict(x=x))

    for y in xrange(100):
        db.insert(dict(y=y))

    print db.get('x', 10, with_doc=True)


if __name__ == '__main__':
    main()

简单来说,我想在x里面放字符串值,而不是整数值,但当我尝试这样做时,出现了一个错误提示:“无法将参数转换为整数”。

我相信这应该是个很简单的事情,但我在他们的文档里找不到相关信息。

也许我只需要在make_key_value这个函数里加一个字符串转换,但我不确定这样做是否有效。

2 个回答

0

这里有一些例子,可以去看看 minitwit.py 里是怎么调用 database_indexes.py 的。

在 database_indexes.py 里,有一些方法可以把数据以键值对的形式存储。

minitwit.py 会调用这些方法,把数据插入到数据库中,具体可以看看注册功能是怎么实现的。

我正在努力自己弄明白这些内容。

3

这里是迷你推特功能的简化版,我用它来做一些实验。

createDB,顾名思义,就是用来创建一个数据库,并且可以让你往里面填充数据。

requestDat,里面有一些查询数据库的示例。如果你想到更多可以用来精确查找数据的查询,比如通过一个ID索引来查找,请分享出来。

createDB.py

from hashlib import md5
from werkzeug import check_password_hash, generate_password_hash
from CodernityDB.database_thread_safe import ThreadSafeDatabase
from CodernityDB.database import RecordNotFound



#indexTypes

from CodernityDB.hash_index import HashIndex
from CodernityDB.tree_index import TreeBasedIndex

class UserIndex(HashIndex):

    def __init__(self, *args, **kwargs):
        kwargs['key_format'] = '16s'
        super(UserIndex, self).__init__(*args, **kwargs)

    def make_key_value(self, data):
        if data['t'] == 'user':
            username = data['username']
            return md5(username).digest(), {'user_id': data['user_id'], 'email': data['email']}

    def make_key(self, key):
        return md5(key).digest()


class UserIDIndex(HashIndex):

    def __init__(self, *args, **kwargs):
        kwargs['key_format'] = 'I'
        super(UserIDIndex, self).__init__(*args, **kwargs)

    def make_key_value(self, data):
        if data['t'] == 'user':
            user_id = data['user_id']
            return user_id, {'username': data['username'], 'email': data['email']}

    def make_key(self, key):
        return key



#Functions
def register(username_input,email_input,password_input):
    """Registers the user."""

    try:
        cdb.get('user', username_input, with_storage=False)
    except RecordNotFound:
        cdb.insert(dict(
                    t='user',
                    user_id=cdb.count(cdb.all, 'user') + 1,  # do not use in production!
                    username=username_input,
                    email=email_input,
                    pw_hash=generate_password_hash(password_input)))
        print('You were successfully registered')
    else:
        print('The username is already taken')





# configuration
DATABASE = 'testDB'
SECRET_KEY = 'development key'
cdb = ThreadSafeDatabase(DATABASE)


#main
def main():
    if cdb.exists():
        cdb.open()
        cdb.reindex()
    else:
        #from database_indexes import UserIndex, MessageAllIndex, MessageUserIndex, FollowerRel1Index, FollowerRel2Index, UserIDIndex, FollowerIndex
        cdb.create()
        cdb.add_index(UserIndex(cdb.path, 'user'))
        cdb.add_index(UserIDIndex(cdb.path, 'user_id'))

    #test insert
    username_input="none"
    print("type 'n' to end database population")
    while username_input != "n":
        username_input = raw_input("new username: ")
        email_input = raw_input("new email: ")
        password_input = raw_input("new password: ")
        register(username_input,email_input,password_input)


#Run Main
if __name__ == '__main__':
    main()

requestDat.py

from hashlib import md5

from werkzeug import check_password_hash, generate_password_hash


from CodernityDB.database_thread_safe import ThreadSafeDatabase
from CodernityDB.database import RecordNotFound



#indexTypes

from CodernityDB.hash_index import HashIndex
from CodernityDB.tree_index import TreeBasedIndex




def get_user_id(username):
    """Convenience method to look up the id for a username."""
    try:
        rv = cdb.get('user', username, with_storage=True)
    except RecordNotFound:
        rv = None
    else:
        rv = rv['user_id']
    return rv

def get_id(username):
    """Convenience method to look up the id for a username."""
    try:
        rv = cdb.get('id', username, with_storage=True)
    except RecordNotFound:
        rv = None
    else:
        rv = rv['id']
    return rv

def get_username(user_id):
    """Convenience method to look up the id for a username."""
    try:
         rv = cdb.get('user', user_id, with_storage=True)
        # rv= user['doc']['pw_hash']
    except RecordNotFound:
        rv = None
    else:
        rv = rv['user_id']
    return rv

def get_allRecords(getRecord):
    """Convenience method to look up the id for a username."""
    try:
        #rv = cdb.get('username', user_id, with_storage=True)
        #cdb.all(getRecord)
        rv="\n"
        for curr in cdb.all(getRecord):
            rv = rv+str(curr)
    except RecordNotFound:
        rv = None
    else:
        rv = "all"
    return rv


# configuration
DATABASE = 'testDB'
SECRET_KEY = 'development key'
cdb = ThreadSafeDatabase(DATABASE)


#main
def main():
    if cdb.exists():
        cdb.open()
        cdb.reindex()
    else:
        #from database_indexes import UserIndex, MessageAllIndex, MessageUserIndex, FollowerRel1Index, FollowerRel2Index, UserIDIndex, FollowerIndex
        print("no database pressent")
        quit()

    print("type'n' to advance through prompts")

    useridRequest_input ="none"
    while useridRequest_input != "n":
        useridRequest_input = raw_input("get ID, search username: ")
        print(get_user_id(useridRequest_input))

    usernameRequest_input ="none"    
    while usernameRequest_input != "n":
        usernameRequest_input = raw_input("get username, search ID: ")
        print(get_username(usernameRequest_input))



    print ("counting id tags")
    print cdb.count(cdb.all, 'id')
    print("counting user_id tags")
    print cdb.count(cdb.all, 'user_id')
    for curr in cdb.all('id'):
        print curr


#Run Main
if __name__ == '__main__':
    main()

撰写回答