有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何为此Twitter API为KafkaProducer建立连接

大家好,有人能帮我用Kafka连接给出这个Java代码的连接吗

提前谢谢

package example.producer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.scribe.builder.*;
import org.scribe.builder.api.*;
import org.scribe.model.*;
import org.scribe.oauth.*;

public class TwitterStreamConsumer  extends Thread {




    private static final String STREAM_URI = "https://stream.twitter.com/1.1/statuses/filter.json";

    public void run(){
        try{
            System.out.println("Starting Twitter public stream consumer thread.");

            // Enter your consumer key and secret below
            OAuthService service = new ServiceBuilder()
                    .provider(TwitterApi.class)
                    .apiKey("xxxxx")
                    .apiSecret("xxxxx")
                    .build();

            // Set your access token
            Token accessToken = new Token("xxxxx", "xxxxxx");

            // Let's generate the request
            //System.out.println("Connecting to Twitter Public Stream");
            OAuthRequest request = new OAuthRequest(Verb.POST, STREAM_URI);
            request.addHeader("version", "HTTP/1.1");
            request.addHeader("host", "stream.twitter.com");
            request.setConnectionKeepAlive(true);
            request.addHeader("user-agent", "Twitter Stream Reader");
            request.addBodyParameter("track", "**screenname**"); // Set keywords you'd like to track here
            service.signRequest(accessToken, request);
            Response response = request.send();

            // Create a reader to read Twitter's stream
            BufferedReader reader = new BufferedReader(new InputStreamReader(response.getStream()));

            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println(line);
            }
        }
        catch (IOException ioe){
            ioe.printStackTrace();
        }

    }

public static void main(String[] args){

    final TwitterStreamConsumer streamConsumer = new TwitterStreamConsumer(); // final because we will later pull the latest Tweet
    streamConsumer.start();
}
}

大家好,有谁能建议我如何将这个Java代码与ApacheKafka连接起来吗。我试过很多方法,但都不对劲。有人能帮忙吗

提前谢谢


共 (4) 个答案

  1. # 1 楼答案

    响应类的getStream()方法被声明为private,因此无法访问它以获取输入流

    private static final String topic = "twitter-feed-topic";
    
    private static final String STREAM_URI = "https://api.twitter.com/1.1/statuses/home_timeline.json";
    
    System.out.println("Starting Twitter public stream consumer thread");
    
        Properties properties = new Properties();
    
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
    
        ProducerConfig config = new ProducerConfig(properties);
    
        Producer<String, String> producer = new Producer<String, String>(config);
    
        OAuthService service = new ServiceBuilder()
                .provider(TwitterApi.class)
                .apiKey("Api Key")
                .apiSecret("Api Secret key")
                .build();
    
        Scanner in = new Scanner(System.in);
    
        // Obtain the Request Token
    
        Token requestToken = service.getRequestToken();
    
        // authorize scribe here
    
        System.out.println(service.getAuthorizationUrl(requestToken));
    
        System.out.println("enter the verifier number ");
        System.out.print(">>");
        Verifier verifier = new Verifier(in.nextLine());
    
        // access the token
    
        Token accessToken = service.getAccessToken(requestToken, verifier);
    
        OAuthRequest request = new OAuthRequest(Verb.GET, STREAM_URI, service);
        service.signRequest(accessToken, request);
        Response response = request.send();
    
        System.out.println(response.getBody());
    
        InputStream is = new ByteArrayInputStream((response.getBody()).getBytes(StandardCharsets.UTF_8));
        BufferedReader inputReader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
        String line;
        try {
            while ((line = inputReader.readLine()) != null) {
                KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, line);
                producer.send(message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    

    上面的twitter卡夫卡制作人示例运行良好。我希望这有帮助

  2. # 2 楼答案

    使用Twitter散列标签的卡夫卡制作人

    包裹通讯。多重生产者。sparkstreaming。Multiplekafkaproducersparkstreaming

    import java.util.Properties;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import com.google.common.collect.Lists;
    import com.twitter.hbc.ClientBuilder;
    import com.twitter.hbc.core.Client;
    import com.twitter.hbc.core.Constants;
    import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
    import com.twitter.hbc.core.processor.StringDelimitedProcessor;
    import com.twitter.hbc.httpclient.auth.Authentication;
    import com.twitter.hbc.httpclient.auth.OAuth1;
    
    /**
     * Hello world!
     *
     */
    public class TwitterHash_tag 
    {
        private static final String topic = "Hash_tag";
    
        public static void run(String consumerKey, String consumerSecret,
                String token, String secret) throws InterruptedException {
    
            Properties properties = new Properties();
            properties.put("metadata.broker.list", "localhost:9092");
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("client.id","camus");
            ProducerConfig producerConfig = new ProducerConfig(properties);
    
            Producer<String, String> producer = new Producer<String, String>(producerConfig);
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
            StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
    
            // add some track terms in hashtag
            endpoint.trackTerms(Lists.newArrayList("",
                    "#India"));
    
    
            Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
                    secret);
            // Authentication auth = new BasicAuth(username, password);
    
            // Create a new BasicClient. By default gzip is enabled.
            Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
                    .endpoint(endpoint).authentication(auth)
                    .processor(new StringDelimitedProcessor(queue)).build();
    
            // Establish a connection
            client.connect();
    
            // Do whatever needs to be done with messages
            for (int msgRead = 0; msgRead < 10; msgRead++) {
                KeyedMessage<String, String> message = null;
                try {
                    message = new KeyedMessage<String, String>(topic, queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producer.send(message);
                System.out.println(message);
            }
            producer.close();
            client.stop();
    
        }
    
        public static void main( String[] args ) throws InterruptedException
        {
    
            TwitterHash_tag.run("consumerKey", "consumerSecretkey",
                    " AccessToken", "AccessTokenSecret");
    
        }
    }
    
  3. # 3 楼答案

    package example.producer;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Properties;
    
    import org.scribe.builder.*;
    import org.scribe.builder.api.*;
    import org.scribe.model.*;
    import org.scribe.oauth.*;
    
    import org.apache.commons.io.IOUtils;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.HttpClient;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.impl.client.DefaultHttpClient;
    
    public class TwitterStreamConsumer7  extends Thread {
    	
    	
    	private static final String STREAM_URI = "https://stream.twitter.com/1.1/statuses/filter.json";
    
        public void run(){
            try{
            	 FileReader f = new FileReader("/home/trainings/Desktop/Streamin/input1.csv");
       		  BufferedReader bf = new BufferedReader(f);
       		  String Screen_Name="";
       		  while((Screen_Name=bf.readLine())!=null)
       		  {
                System.out.println("Starting Twitter public stream consumer thread.");
                System.out.println(Screen_Name);
    
                // Enter your consumer key and secret below
                OAuthService service = new ServiceBuilder()
                        .provider(TwitterApi.class)
                        .apiKey("ZRra7TMrssssssssssssssssxxxxxxxxxxx")
                        .apiSecret("LgUhEY4R8xxxxxxxxxxxxxxxxxxQw069D")
                        .build();
    
                // Set your access token
                Token accessToken = new Token("349211dddddddddddddddhyv4AL01lMRVN", "gqNqPuWoSxxxxxxxxxxxxxxxxxxxxkz1xCrzxWMUgd3kZ");
                Properties props = new Properties();
        	    props.put("metadata.broker.list", "localhost:9092");
        	    props.put("serializer.class", "kafka.serializer.StringEncoder");
        	    props.put("partitioner.class", "example.producer.SimplePartitioner");
        	    props.put("request.required.acks", "1");
        	    props.put("retry.backoff.ms", "150");
        	    props.put("message.send.max.retries","10");
        	    props.put("topic.metadata.refresh.interval.ms","0");
    
        	    ProducerConfig config = new ProducerConfig(props);
    
        	    final Producer<String, String> producer = new Producer<String, String>(config);
                // Let's generate the request
                //System.out.println("Connecting to Twitter Public Stream");
                OAuthRequest request = new OAuthRequest(Verb.POST, STREAM_URI);
                request.addHeader("version", "HTTP/1.1");
                request.addHeader("host", "stream.twitter.com");
                request.setConnectionKeepAlive(true);
                request.addHeader("user-agent", "Twitter Stream Reader");
                request.addBodyParameter("track", Screen_Name); // Set keywords you'd like to track here
                service.signRequest(accessToken, request);
                Response response = request.send();
       		  	
                // Create a reader to read Twitter's stream
               BufferedReader reader = new BufferedReader(new InputStreamReader(response.getStream()));
       		  	
               String line;
           while ((line = reader.readLine()) != null) {
        	   KeyedMessage<String, String> data = new KeyedMessage<String, String>("twitter_events5",line);
                    System.out.println(line);
                    producer.send(data);
                    Thread.sleep(1000);
                }
       		  //}
            }
            }     
            catch (IOException ioe){
                ioe.printStackTrace();
            
            } catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
        }
    
    public static void main(String[] args){
    
        TwitterStreamConsumer7 streamConsumer = new TwitterStreamConsumer7(); // final because we will later pull the latest Tweet
        streamConsumer.start();
    
    }
    }
        

    解决了这个问题会有帮助的

  4. # 4 楼答案

    卡夫卡制作人使用Spark流媒体

    package com.multipleproducer.sparkstreaming.Multiplekafkaproducersparkstreaming;
    
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import twitter4j.FilterQuery;
    import twitter4j.StallWarning;
    import twitter4j.Status;
    import twitter4j.StatusDeletionNotice;
    import twitter4j.StatusListener;
    import twitter4j.TwitterStream;
    import twitter4j.TwitterStreamFactory;
    import twitter4j.conf.ConfigurationBuilder;
    
    public class Twitterdata {
    
        public static void Run(String ConsumerKey, String ConsumerSecret,
                String AccessToken, String AccessTokenSecret  ) {
    
    
            Properties properties = new Properties();
            properties.put("metadata.broker.list", "localhost:9092");
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("client.id","camus");
            ProducerConfig producerConfig = new ProducerConfig(properties);
            final Producer<String, String> producer = new Producer<String, String>(producerConfig);
    
            ConfigurationBuilder cb = new ConfigurationBuilder();
            cb.setDebugEnabled(true);
            cb.setOAuthConsumerKey(ConsumerKey);
            cb.setOAuthConsumerSecret(ConsumerSecret);
            cb.setOAuthAccessToken(AccessToken);
            cb.setOAuthAccessTokenSecret(AccessTokenSecret);
            TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
            //kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
    
    
            System.out.println("##################### TWITTER___ STARTED ###########################");
            StatusListener listener = new StatusListener() {
    
                public void onDeletionNotice(
                        StatusDeletionNotice statusDeletionNotice) {
                    // TODO Auto-generated method stub
    
                }
    
                public void onException(Exception ex) {
                    // TODO Auto-generated method stub
    
                }
    
                public void onScrubGeo(long userId, long upToStatusId) {
                    // TODO Auto-generated method stub
    
                }
    
                public void onStallWarning(StallWarning warning) {
                    // TODO Auto-generated method stub
    
                }
    
                public void onStatus(Status data) {
                    // TODO Auto-generated method stub
    
                    twitter4j.User user = data.getUser();
                    String userdata = user.toString();
                    userdata = userdata.replaceAll("UserJSONImpl", "");
                    System.out.println();
    
                    String topic="twitterdata";
                    KeyedMessage<String, String> info = new KeyedMessage<String,String>(topic,userdata); 
                    producer.send(info);
                    System.out.println(info);
    
    
                    /*JSONObject obj=new JSONObject();
                    obj.put("UserId", user.getId());
                    obj.put("Name",user.getName());
                    obj.put("ScreenName",user.getScreenName());
                    obj.put("CreatedAt",user.getCreatedAt());
                    obj.put("Location",user.getLocation());
                    obj.put("TimeZone",user.getTimeZone() );
                    obj.put("Lang",user.getLang());
                    obj.put("UtcOffset",user.getUtcOffset());
                    obj.put("Description",user.getDescription());
                    obj.put("FavouritesCount", user.getFavouritesCount());
                    obj.put("FollowersCount",user.getFollowersCount());
                    obj.put("FriendsCount",user.getFriendsCount());
                    obj.put("ListedCount", user.getListedCount());
                    obj.put("URL",user.getURL());
                    obj.put("StatusesCount",user.getStatusesCount());
                    obj.put("OriginalProfileImageURL",user.getOriginalProfileImageURLHttps());
                    obj.put("Tweets",data.getText());
                    obj.put("CurrentUserRetweetId",data.getCurrentUserRetweetId());
                    obj.put("InReplyToUserId",data.getInReplyToUserId());
                    obj.put("InReplyToScreenName", data.getInReplyToScreenName());
                    obj.put("getSource", data.getSource());
    
                    System.out.println(obj);*/
    
                }
    
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                    // TODO Auto-generated method stub
    
                }
            };
            FilterQuery fq = new FilterQuery();
            String keywords[] = { "BigTappIndia" };
    
            fq.track(keywords);
            // track Singapore location
            //double[][] location = { { Latitude }, { Longitude } };
            //fq.locations(location);
            twitterStream.addListener(listener);
            twitterStream.filter(fq);
             twitterStream.sample();
    
        }
    
         public static void main(String args[]){
    
    
         Twitterdata.Run("consumerkey",
          "ConsumerSecret",
          "AccessToken",
          "AccessTokenSecret");
    
          }
    
    
    }