The acquisition code is split into a few kinds of objects:
acq.py
: driver program.- Creates a tweet Collector in search or stream mode.
- As the only user, I decided not to expose the query terms and options as parameters, but to modify them in place. This should be revisited when reused.
- Currently, that means the tags to look for, the language, and the timeframe are in this source file.
credentials.py
: easy access to credentials in config files- twitter credentials are read from
~/.tweepy
- AWS credentials are read from
~/aws/credentials
.
- twitter credentials are read from
collector.py
: talks to twitter- The Collector gets data from Twitter using either search or stream APIs
- It controls the pathnames of the rolling log files via the sink creation callback
makeSink
. This should move to the interface. This is currently where the S3 AWS credentials are used. - When streaming, it uses the
EmittingListener
inlistener.py
as its listener - The Collector then passes each tweet to a Facet
- The facet is invoked as a context manager using
contextutil.closing()
, guaranteeing that the facet'sclose
method will be called in case of exception. This will cause in data in progress to be written to file(s). - It passes a callback for creating a sink from a key to the Facet. This is kind of ugly, and the decisions of what S3 bucket to use and what the file path format string are should be exposed in the interface.
- The facet is invoked as a context manager using
- As a convenience, I added a
progress.counter
subclass that will show a progress bar if using the search API for a specific date range, and a tweet counter otherwise. - The last
id
andcreated_at
timestamp. In certain retry cases, if the last ID is set, it will send that to the API endpoint as a max or since ID.
listeners.py
: stream listener used in Collector- The most interesting bit here is the
pysigset.suspended_signals
context manager.- This wraps the code sending a tweet from
on_status
to the appropriate sink. - It ensures that external signals (specifically, SIGINT and SIGTERM) will only be delivered after finishing the sink write operations, when they are in a consistent state to be closed when the Facet is closed.
- For the search API, there is analogous code in the Collector. It should probably be moved to its own class to facilitate new collectors' being written, possibly shared with the listener.
- This wraps the code sending a tweet from
- The most interesting bit here is the
facets.py
: decides whether or not to process a tweet, and what Sink(s) to send it to.- Right now, there is only one,
FilteringFacet
, which uses aRegexpMatcher
to identify messages with particular tags, and for composing the file paths for the corresponding sink.
- Right now, there is only one,
matchers.py
: policy class for theFilteringFacet
.- This has one method,
check
, which receives a tweet. If the tweet matches the regexp, all matches are transformed to lowercase, deduped, and joined with underscores as the delimiter to form the key it returns. If there are no matches, it returnsNone
.
- This has one method,
sinks.py
: A variety of somewhat ad hoc, somewhat composable policies for writing tweets to storage.- These should be made more nicely composable, but for that can wait till it's needed.
- In this app, the top level sink is a
RollingSink
, which rolls files based on the caller-provided record limit and path format string. - It wraps a
RecordSink
, which prints the records written to it with an open square bracket, commas at the ends of records, and a close square bracket, so that JSON records written to it will be a well-formed array. - That sits on top of either a
FileSink
or anS3Sink
, which write to a local FS or S3, respectively.
- The processing code is in
proc.ipynb
, but the python code is extracted intoproc.py
and the output intoproc.md
and auxiliary files. - For expediency, I downloaded the files from s3 to a subdirectory
tweets
and processed them locally. For larger data sets I would likely leave them there and refactor the code to process them usingmrjob
.
For each tag, I read in and JSON-decodes the files for that tag in parallel. The tweets from each file are tokenized using my own regexp, which matches
- http or https URLs
- mentions and hashtags, including the octothorpe or at sign
- sequences of alphanumeric characters plus apostrophes
The last category corresponds to actual words, so the tokens are downcased and passed to
nltk.EnglishStemmer
. The others are just downcased (including URLs, which probably should not be). Since the stemmer will turn punctuation into standalone tokens, single-character tokens are skipped unless alphanumeric. If the token is in thenltk
stopwords list, it is also skipped. Finally, empty tokens are skipped. At this point, the token is added to aFreqDist
(one per file being processed). Once all tweets in a file are processed, the FreqDist is returned,Parallel
presents theFreqDist
s to the caller as a sequence. I loop over these updatingConditionalFreqDist
' using the tag as the condition. After processing all the tags, I use each condition/tag'sFreqDist
to make an overallFreqDist
with condition'all'
, and finally loop over the tags and plot eachFrequencyDistribution
.
The tweets are stored in bucket s3://nkrishna-mids205-hw2
.
The frequency plots are the png
files associated with the notebook (see proc.py, and are included below: