트위터의 API V2가 추가된 지 어느 정도 시간이 흘렀고, tweepy가 올 22년 하반기부터 본격적으로 V2 위주로 업데이트가 될 것으로 보인다. 그래서 tweepy를 이용한 기존의 Stream 메서드들이 deprecate 예정이고, 따라서 V2 위주의 코드 업데이트가 필요하다.
단순한 Stream 데이터 연습을 하기위해서 트위터 api를 사용해보려다가 V1.1과 V2에서 오랜 시간 삽질을 했다.
그 덕분에 V1.1과 V2가 어떤 차이가 있는지 대강 가늠할 수 있게 되었다.
그래서 본 게시물은 둘이 어떤 차이점이 있는지, V2를 이용해 tweepy로 스트림하는 방법을 작성하려고 한다.
Twitter API V1.1 과 V2의 차이점
tweepy로 스트림 코드를 작성하기 전에 V1.1 과 V2의 차이점을 알아야 한다.
둘의 가장 큰 차이점은 두가지가 있다.
- 스트림 클라이언트가 생긴 것
- 필터링이 간편해지고 필터링의 범위가 넓어졌다는 것
1. StreamClient
공식적으로 tweepy 4.9 버전부터 Stream 클래스를 사용하지 않고, StreamClient 클래스를 사용한다.
기존 API 1.1에서는 tweepy의 Stream 클래스를 인스턴스 하고, fliter 메서드를 사용해 스트림을 실행했다.
그리고 Stream 클래스를 인스턴스 하기 위해서는 트위터 개발자 계정 API Key가 4가지가 있어야 했다.
consumer key, consumer secret key, access token, access token secret
V2를 이용할 때는 Stream 클래스를 사용하지 않고 대신에 StreamClient 클래스를 사용한다.
클라이언트에서 V2의 엔드포인트로 스트림 규칙을 설정하고, 추가하고 요청하는데 사용한다.
StreamClient 클래스를 인스턴스 하기 위해서는 위에서 나열한 4가지의 api키가 아닌 BEARER TOKEN 키 하나만 필요하다.
2. 필터링의 범위 확장
API 1.1을 사용한 tweepy의 Stream.fliter 메서드는 기본적으로 필터링이 되지 않은 채로 수많은 key를 가진 많은 양의 데이터가 들어온다. 하지만 API 2를 사용하는 tweepy의 StreamClient.filter 메서드는 가장 기본적인 데이터만 들어온다.
이렇게 API v2는 가장 기본적인 데이터만 들어오지만 expansion 파라미터가 생겼고, 내가 필요한 데이터를 확장해서 가지고 오는 것이다.
둘의 차이점 두가지를 요약하면 다음과 같다.
API 1.1은 Stream.filter 메서드를 사용해 스트림 요청을 하고, 필터링의 범위가 작았다.
API 2는 StreamClient.filter 메서드를 사용해 기본적인 데이터에서 필요한 데이터를 확장 요청해서 가지고 온다.
tweepy로 트위터 API V2 스트리밍
먼저 간단하게 api v1.1을 사용하는 tweepy의 Stream.filter 코드는 아래와 같다.
# tweepy < 4.9
# twitter API V1.1
from twitter_api import (
CONSUMER_KEY,
CONSUMER_SECRET,
ACCESS_TOKEN,
ACCESS_TOKEN_SECRET,
)
import tweepy
# tweepy.Stream 클래스를 상속받는 클래스
class TwitterStream(tweepy.Stream):
def on_data(self, raw_data):
print(raw_data) # 스트림 데이터 프린트
# 스트림 인스턴스 생성
stream = TwitterStream(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
# 스트림 시작
stream.filter(track=["Twitter"])
트위터 API V2를 사용해 스트림 하는 기본적인 코드
# tweepy >= 4.9
# twitter API V2
from twitter_api import (
BEARER_TOKEN
)
import tweepy
# tweepy.StreamClient 클래스를 상속받는 클래스
class TwitterStream(tweepy.StreamingClient):
def on_data(self, raw_data):
print(raw_data)
# 스트림 클라이언트 인스턴스 생성
client = TwitterStream(BEARER_TOKEN)
# 스트림 규칙 추가
client.add_rules(tweepy.StreamRule(value="Twitter"))
# 스트림 시작
client.filter()
트위터 API V2 를 사용하는 tweepy의 StreamClient.filter 코드는 약간의 차이가 있다.
바로 필터 규칙을 설정해 주어야 하는데, 이 규칙들은 api v2의 엔드포인트에 남아있다.
따라서 새로운 필터링을 하기 위해서는 기존의 규칙들을 불러오고, 필요하지 않은 기존 규칙들을 제거하는 작업이 필요하다.
우선 규칙들을 불러오면 다음과 같은 tweepy.client.Response 타입으로 규칙을 불러온다.
rules = client.get_rules()
print(rules)
print(type(rules))
>>> Response(data=[StreamRule(value='Twitter', tag=None, id='1529086903898632193')], includes={}, errors=[], meta={'sent': '2022-05-25T07:13:41.435Z', 'result_count': 1})
>>> <class 'tweepy.client.Response'>
이 Response의 data는 규칙의 정보를 담고 있는 리스트다. 규칙들은 value, tag, id를 가지고 있고, 규칙을 추가할 때 설정할 수 있다. id값을 따로 설정하지 않으면 자동으로 설정된 값이 저장된다.
# 규칙이 여러개 있는 경우 출력 방식
print(rules.data)
>>> [StreamRule(value='Twitter', tag=None, id='1529086903898632193'), StreamRule(value='BTS', tag=None, id='1529363211085963265')]
따라서 규칙을 제거하기 위해서는 스트림 규칙의 id 값이 필요하고, 이것을 활용해 모든 규칙을 제거 할 수 있다.
id 값들을 리스트에 담고 delete_rules(ids) 메서드로 기존의 규칙들을 모두 제거해준다.
# 기존의 규칙들을 제거 하는 함수
def delete_all_rules(rules):
# 규칙 값이 없는 경우 None 으로 들어온다.
if rules is None or rules.data is None:
return None
stream_rules = rules.data
ids = list(map(lambda rule: rule.id, stream_rules))
client.delete_rules(ids=ids)
전체 코드 흐름은 다음과 같다.
# tweepy >= 4.9
# twitter API V2
from twitter_api import (
BEARER_TOKEN
)
import tweepy
# tweepy.StreamClient 클래스를 상속받는 클래스
class TwitterStream(tweepy.StreamingClient):
def on_data(self, raw_data):
print(raw_data)
# 규칙 제거 함수
def delete_all_rules(rules):
# 규칙 값이 없는 경우 None 으로 들어온다.
if rules is None or rules.data is None:
return None
stream_rules = rules.data
ids = list(map(lambda rule: rule.id, stream_rules))
client.delete_rules(ids=ids)
# 스트림 클라이언트 인스터턴스 생성
client = TwitterStream(BEARER_TOKEN)
# 모든 규칙 불러오기 - id값을 지정하지 않으면 모든 규칙을 불러옴
rules = client.get_rules()
# 모든 규칙 제거
delete_all_rules(rules)
# 스트림 규칙 추가
client.add_rules(tweepy.StreamRule(value="#BTS has:images"))
# 스트림 시작
client.filter()
스트림 규칙은 문자열 형태로 전달해 주어야 하는데 쿼리문처럼 문법이 있다.
이 문법은 공식 docs에서 자세히 알려주고 있고, 그리 복잡하지는 않다.
스트림이 출력되는 모습
여기서부터는 V2에서 제공하는 expension을 활용한 확장 검색 방법이다.
expansion은 여러 필드가 나뉘어 있고, 이 필드를 추가함으로써 내가 원하는 데이터를 추가로 얻거나 필터링할 수 있다.
여러 필드에 대한 정보는 아래 expansions링크나, Data dictionary 링크를 참고하면 된다.
예를 들어 한국어로 된 트위터만 검색하고, 날짜를 추가로 받아오고 싶다. 이때 내가 필요한 필드는 tweet_field이다.
client.filter(tweet_fields=["lang", "created_at"])
참고로 language 필드는 어떤 언어가 사용됐는지만 알려준다.
맨 처음 클래스를 정의할 때 on_date 메서드에서 추가로 필터링해 한국어 정보를 받아 올 수 있을 것이다.
'Development > Data Engineering' 카테고리의 다른 글
ETL이란? (0) | 2022.07.08 |
---|---|
[Apache Flink] 플링크 v1.15 TableEnviroment execute() 메서드 삭제 (0) | 2022.05.12 |
[Docker] M1 맥 도커 confluentinc/cp-kafka 대체 이미지 (0) | 2022.04.28 |
[Apache Spark] 아파치 스파크 RDD란? (0) | 2022.04.19 |
[Apache Airflow] 아파치 에어플로우란? (0) | 2022.04.16 |
댓글