Ryan Bury

Malicious Twitter User Detection

Python Tweepy Output

Download Source Code



Tutorial

The tutorial to learn how to learn Tweepy, Azure, and Python to detect malicious users is split into 4 parts. One of the parts is also about how to view the output effectively in PowerBI.

  1. Twitter and Tweepy
  2. Azure Stream Analytics
  3. Viewing Output with Power BI
  4. Custom Twitter Application

Download Tutorials



TwitterStreamAnalytics.py

The only source code file, this contains almost everything you need to run the program. For a more complete description, complete the tutorial.

import tweepy
import json
from azure.servicebus import ServiceBusService
import time
import datetime

# Initialize Twitter rate limit variables
TwitterRESTLimit = 149
TwitterRESTTimeLimit = 15  # In Minutes
requests = 0
start_time = time.time()

# Authenticate Tweepy
consumer_key = ''
consumer_secret = ''
access_token = ''
access_token_secret = ''

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

# Authenticate Azure Service Bus
service_namespace = ''
key_name = ''
key_value = ''
sbs = ServiceBusService(service_namespace, shared_access_key_name=key_name, shared_access_key_value=key_value)

def find_link_count(status):
    # print('Link Count: ', len(status.entities['urls']))
    return len(status.entities['urls'])


def find_reply_count(status):
    if status.in_reply_to_user_id is not None:
        return 1
    else:
        return 0


def find_trend_count(status):
    # print('Trend Count: ', len(status.entities['hashtags']))
    return len(status.entities['hashtags'])


def find_at_count(status):
    # print('@ Count: ', len(status.entities['user_mentions']))
    return len(status.entities['user_mentions'])


class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        user = status.user
        followers = 0
        following = .000001
        link_count = 0
        tweet_count = 0
        duplicate_count = 0
        reply_count = 0
        trend_count = 0
        at_count = 0

        print("Collecting data on ", user.name)

        followers = user.followers_count
        if user.friends_count != 0:
            following = user.friends_count


        timeline = api.user_timeline(user.id, count=50)
        requests += 1

        print(requests)

        if status.coordinates is not None:
            coordinates = status.coordinates['coordinates']
        else:
            coordinates = None

        for status in timeline:

            # Number of tweets
            tweet_count += 1

            # Number of links to external sites
            link_count += find_link_count(status)

            # Finding reply count
            reply_count += find_reply_count(status)

            # Finding hash tag count
            trend_count += find_trend_count(status)

            # Finding at count
            at_count += find_at_count(status)

        print('Total Tweet Count: ', tweet_count)
        print('Total Link Count: ', link_count)
        print('Total Reply Count: ', reply_count)
        print('Total Trend Count: ', trend_count)
        print('Total @ Count: ', at_count)
        print()

        user_profile = {'name': user.name, 'date_time': datetime.datetime.now().isoformat(), 'coordinates': coordinates,
                        'followers_ratio': followers/following, 'tweet_count': tweet_count, 'link_count': link_count,
                        'reply_count': reply_count, 'trend_count': trend_count, 'at_count': at_count,
                        'link_ratio': link_count/tweet_count, 'reply_ratio': reply_count/tweet_count,
                        'at_ratio': at_count/tweet_count, 'trend_ratio': trend_count/tweet_count}
        print(json.dumps(user_profile))
        sbs.send_event('twitterhub', json.dumps(user_profile))
        print()

        # check for Twitter rate limits
        global requests
        global start_time
        global myStream
        elapsed_time = time.time() - start_time
        if (requests >= TwitterRESTLimit) & (elapsed_time <= 60 * TwitterRESTTimeLimit):
            print('Connection Temporarily Disconnected')
            myStream.disconnect()
            myStream = None
            print('***** SLEEPING *****')
            print('Sleeping for ', 60 * TwitterRESTTimeLimit-elapsed_time, ' from ', time.time())
            time.sleep(60 * TwitterRESTTimeLimit-elapsed_time)
            requests = 0
            start_time = time.time()
            myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
            myStream.filter(locations=[-78, 40, -77, 41], async=True)

    def on_error(self, status_code):
        print(status_code)
        if status_code == 420:
            return False


myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)

print('Streaming Tweets from State College, PA area: ')

myStream.filter(locations=[-78, 40, -77, 41], async=True)