Python Twitter工具同步搜索和流

2024-10-02 02:24:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试制作一个Python服务器,可以从其他应用程序调用它来请求Twitter数据。我通常使用Python作为脚本语言,所以如果有人在我的代码中看到任何危险信号,我会洗耳恭听!在

这基本上就是我到目前为止所拥有的,当我ping服务器时,它会从我的时间轴上得到10条tweet并将它们发送回我的其他应用程序。我的主要问题是我想把流媒体和搜索结合起来。这样我就可以打开一个特定的哈希标记流,我希望它能实时发送给我的其他应用程序,但是我会定期搜索不需要实时发送给我的其他东西。在

我已经成功地分别使用了这两种方法,但不知道如果我想实现这两种方法,我想从何处着手,在本例中,我希望将流功能引入其中。在

我使用的是pythontwitter工具1.10.2-http://mike.verdone.ca/twitter/ 以及python3.3

下面的代码,谢谢!在

在编辑:我是可以通过在if data==“SEARCH_NOW”if语句之后添加twitter流连接来进一步。但这让我想起了最初的问题。一旦twitter流打开,代码似乎就在那里等待。如果我把它放在时间轴查找之前,那么我永远不能调用时间线查找。更新代码以反映。在

编辑2:将搜索请求放在twitter流循环中有点近了。我现在可以打开流,每当我收到一条与搜索词匹配的tweet时,我也可以做一个请求。但仍然不能独立。。。在

文件:网络_设置.py在

#!/usr/bin/env python

#network settings
import socket

#set server variables
TCP_IP = '127.0.0.1'
TCP_PORT = 7001
BUFFER_SIZE  = 20

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)

conn, addr = s.accept()

#print connection address when someone connects
print ('Connection address:', addr)

资料图:twitter_设置.py在

^{pr2}$

文件:python_服务器.py在

#python server 

import json 

from network_settings import *
from twitter_settings import *

search_term = 'test'

while 1:

    tweet_iter = stream.statuses.filter(track = search_term)

    for tweet in tweet_iter:
        # check whether this is a valid tweet
        if tweet.get('text'):

            userName = tweet["user"]["screen_name"]
            userTweet = tweet["text"]

            # now print our tweet
            print ('user: ', userName)
            print ('tweet: ', userTweet)

            #send data back
            delivery1 = json.dumps({'type':'showdown','userName':userName,'userTweet':userTweet})
            conn.send(delivery1.encode('utf-8'))
            data = conn.recv(BUFFER_SIZE)
            data = data.decode('utf-8')

            if data == "SEARCH_NOW": 
                 print ('request newest IDS tweets')

                 x = t.statuses.home_timeline(count=10)

                for i in range(10):
                    try:
                        #print(x[i])
                        userName = x[i]['entities']['user_mentions'][0]['screen_name']
                        userTweet = x[i]['text']
                        print('username: ', userName)
                        print('tweet: ', userTweet)
                        delivery = json.dumps({'type':'display','userName':userName,'userTweet':userTweet})
                        conn.send(delivery.encode('utf-8'))
                    except:
                        print('not valid tweet')

在连接关闭()


Tags: 代码pyimport应用程序dataifsettingsusername
1条回答
网友
1楼 · 发布于 2024-10-02 02:24:03

所以终于找到了解决办法。最后,我使用线程来在它自己的线程中运行流,然后每次执行搜索时都打开另一个线程。不确定是否需要关闭每个线程,或者返回是否处理这些问题。如果有人有什么可以改进的地方,我洗耳恭听!在

代码如下:

#!/usr/bin/env python
#python server 

import json 
import threading
import time
import socket
from twitter import *
import re

#get thread lock ready
thread_lock = threading.Lock()

#set server variables
TCP_IP = '127.0.0.1'
TCP_PORT = 7001
BUFFER_SIZE  = 20


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)

conn, addr = s.accept()

#print connection address when someone connects
print ('Connection address:', addr)

#fill these in your app!
#twitter auth keys
OAUTH_TOKEN = ''
OAUTH_SECRET = ''
CONSUMER_KEY = ''
CONSUMER_SECRET = ''

t = Twitter(auth=OAuth(OAUTH_TOKEN, OAUTH_SECRET, CONSUMER_KEY, CONSUMER_SECRET))

auth = OAuth(OAUTH_TOKEN, OAUTH_SECRET, CONSUMER_KEY, CONSUMER_SECRET)
stream = TwitterStream(auth = auth, secure = True)


#twitter functions
def pythonSearch():
    #lock thread to not interrupt search results
    thread_lock.acquire()

    print ('request newest tweets')

    #get 10 things from timeline
    x = t.statuses.home_timeline(count=10)

    for i in range(10):
        try:
            #get username and tweet
            userName = x[i]['entities']['user_mentions'][0]['screen_name']
            userTweet = x[i]['text']

            #print out values
            print('username: ', userName)
            print('tweet: ', userTweet)

            #send json back
            delivery = json.dumps({'type':'display','userName':userName,'userTweet':userTweet})
            conn.send(delivery.encode('utf-8'))
        except:
            #not a retweet
            print('not valid tweet')

    #unlock thread when finished
    thread_lock.release()

    return


def pythonStream():
    #open stream looking for search_term
    search_term = 'TESTING'
    tweet_iter = stream.statuses.filter(track = search_term)

    for tweet in tweet_iter:
        # check whether this is a valid tweet
        if tweet.get('text'):

            #get username and tweet
            userName = tweet["user"]["screen_name"]
            userTweet = tweet["text"]

            # now print our tweet
            print ('user: ', userName)
            print ('tweet: ', userTweet)

            #send json back
            delivery1 = json.dumps({'type':'showdown','userName':userName,'userTweet':userTweet})
            conn.send(delivery1.encode('utf-8'))


#start main loop
while 1:
    #listen for calls
    data = conn.recv(BUFFER_SIZE)
    data = data.decode('utf-8')

    #if someone calls search, do a search
    if data == 'SEARCH':
        threading.Thread(target = pythonSearch).start()


    if data == 'STREAM':
        threading.Thread(target = pythonStream).start()


conn.close()

相关问题 更多 >

    热门问题