본문 바로가기
Project

[프로젝트] 뉴진스 트윗 클러스터링 #2 프로그래밍

by nothing-error 2023. 1. 25.

1. 필요한 라이브러리

%sh
pip install tweepy kafka-python

트위터 API를 편하게 사용하기 위해 tweepy 랑 파이썬으로 kafka를 다루기 위해 kafka-python을 설치.

 

2. Kafka에 트윗 데이터 전송

 

%spark_yarn.pyspark

# 샘플코드 : https://github.com/twitterdev/Twitter-API-v2-sample-code
import tweepy
from kafka import KafkaProducer
from mykeys import Mykeys

producer = KafkaProducer(bootstrap_servers='spark-master-01')

# tweepy.StreamClient 클래스를 상속받는 클래스
class TwitterStream(tweepy.StreamingClient):

    def on_data(self, raw_data):
        producer.send('tweet', raw_data)
        print(raw_data.decode('utf-8'))

# 규칙 제거 함수
def delete_all_rules(rules):
    # 규칙 값이 없는 경우 None 으로 들어온다.
    if rules is None or rules.data is None:
        return None
    stream_rules = rules.data
    ids = list(map(lambda rule: rule.id, stream_rules))
    client.delete_rules(ids=ids)
    
    
#sample code : https://github.com/twitterdev/Twitter-API-v2-sample-code
#rules : https://docs.tweepy.org/en/stable/streamrule.html#tweepy.StreamRule
#query : https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
def main():
    client = TwitterStream( bearer_token=Mykeys.bearer_token())
    rules = client.get_rules()
    delete_all_rules(rules)
    
    client.add_rules(tweepy.StreamRule(value="NewJeans"))
    client.filter(tweet_fields=["lang", "created_at"])
    
if __name__ == "__main__":
    main()

api 토큰의 경우 트위터에서 별도 신청을 통해서 발급받아야 하며 약간 까다로운 편. 또한 v2 로 버전업 되면서 bearer_token 하나만 있으면 실시간 트윗 수집이 가능해졌다. 

https://devkhk.tistory.com/37 블로그 참고해서 코드를 약간 수정하였다. 또한 rules 랑 query 작성방법은 주석에 있는 주소에 들어가면 확인 가능.

코드에 대해서 간단하게 설명하면, TwitterStream 클래스로 tweepy.StreamingClient 를 상속해서 client 를 생성해 주는데 이때 client에는 트위터의 모든 rule이 다 들어가 있다. 필요 없는 rule이 대부분 이므로 모든 rule을 제거한다. add_rules로 StreamRule 만 추가하는데 이때 value에 수집하고자 하는 키워드를 넣어주면 되고, 이때 이미지나 비디오 태그 등등을 설정가능하다. 그리고 filter로 필요한 부분만 가져오면 된다. 나의 경우 언어랑, 트윗 생성시간이 그나마 이후에 다른 프로젝트 할 때도 필요할 듯하여 추가해 놓았다.

https://docs.tweepy.org/en/stable/streamingclient.html?highlight=StreamingClient 

 

StreamingClient — tweepy 4.12.1 documentation

© Copyright 2009-2022, Joshua Roesslein. Revision cc8dd493.

docs.tweepy.org

 

데이터는 아래와 같이 수집된다. json 구조로 수집된다.

{"data":{"created_at":"2023-01-20T05:50:48.000Z","edit_history_tweet_ids":["1616312266508599296"],"id":"1616312266508599296","lang":"en","text":"RT @newjeans_loop: Lee Hyein got scared by fireworks 🥲\n\n#뉴진스 #NewJeans #혜인 #Hyein https://t.co/KVG1etABj6"},"matching_rules":[{"id":"1616308797726945280","tag":""}]}
{"data":{"created_at":"2023-01-20T05:50:49.000Z","edit_history_tweet_ids":["1616312269734019072"],"id":"1616312269734019072","lang":"ja","text":"newjeans カンヘリンイムニダ〜😺\n#キラパラ\n#きらめきパラダイス\n#NewJeans\n#HEARIN https://t.co/Au30aQZQEk"},"matching_rules":[{"id":"1616308797726945280","tag":""}]}
{"data":{"created_at":"2023-01-20T05:50:49.000Z","edit_history_tweet_ids":["1616312269905985537"],"id":"1616312269905985537","lang":"th","text":"@BubblesRianbow สนของแถม กับ รวมวงครับ"},"matching_rules":[{"id":"1616308797726945280","tag":""}]}
{"data":{"created_at":"2023-01-20T05:50:50.000Z","edit_history_tweet_ids":["1616312275866116097"],"id":"1616312275866116097","lang":"fi","text":"RT @NewJeansGlobal: [#UPDATE] NewJeans x Musinsa \n\n#NewJeans #뉴진스 \n@NewJeans_ADOR https://t.co/8K1Myy6OnB"},"matching_rules":[{"id":"1616308797726945280","tag":""}]}
{"data":{"created_at":"2023-01-20T05:50:51.000Z","edit_history_tweet_ids":["1616312276533010432"],"id":"1616312276533010432","lang":"en","text":"NewJeans (뉴진스) 'Ditto' Performance Video https://t.co/3tFVEB5gPy"},"matching_rules":[{"id":"1616308797726945280","tag":""}]}

 

 3. 카프카 스트리밍 데이터 로드

 

%spark_yarn.pyspark
df_stream_kafka = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "spark-master-01:9092,spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092")\
    .option("subscribe", "tweet")\
    .load()

df_stream_kafka.printSchema()

print(df_stream_kafka.isStreaming)

spark에서 카프카 읽을 때에는 일반적인 read가 아닌 readStream을 사용하면 된다.

마지막 줄에 isStreaming 함수를 통해서 현재 데이터가 스트리밍 데이터인지 아닌지를 구분할 수 있다. True 나오면 스트리밍 데이터이다.

 

스키마를 확인해 보면 토픽이름과, key, value 등이 있으며  value 부분에 json 형식으로 우리가 수집한 트윗이 있다.

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

 

4. 트윗데이터 전처리

 

%spark_yarn.pyspark
df_stream_value = df_stream_kafka\
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .select("value")
    
df_stream_tweet = df_stream_value\
    .filter("value is not null")\
    .filter("value <> ''")\
    .toDF("tweet")

df_stream_tweet.printSchema()
df_stream_tweet.isStreaming

Spark에서 selectExpr을 사용하면 SQL 쿼리 작성하듯이 사용할 수 있다. 타입을 string으로 변경한 다음 필요한 value부분만 select 하고 filter를 통해서 null이나 공백이 아닌 부분만 필터링한다.

 

5. Warehouse에 트윗 스트림데이터 저장

table_name="tweet_table_1"

query_df_stream_tweet_table_text = df_stream_tweet\
    .writeStream\
    .trigger(processingTime='4 seconds') \
    .outputMode("append")\
    .option("checkpointLocation", f"hdfs://spark-master-01:9000/checkpoint/structured_streaming/{table_name}")\
    .queryName("query_df_stream_tweet_table_text")\
    .toTable(table_name)

readStream처럼 저장할 땐 writeStream을 사용한다. stream 데이터이기 때문에 n초마다 append 되도록 설정하였고  체크포인트의 경우 스트리밍 데이터를 처리하다가 장애가 발생했을 때 체크포인트를 통해서 수월하게 복구가 가능하다. 

 

 

stream 데이터가 잘 들어오는지 확인하려면 refreshTable을 통해서 table을 갱신 후 count를 통해서 데이터가 늘어나는 것을 확인할 수 있다.

spark.catalog.refreshTable(table_name)
spark.read.table(table_name).count()

 

6. 테이블 확인

tweet에 json 타입으로 한번에 수집되어있기 때문에 이것을 key 값들을 데이터프레임 형태로 변경이 필요하다.

 

 

 

 

7. json  파싱

 

방법 1. 판다스로 json을 데이터프레임으로 변경(spark로 방법을 찾지 못해서 익숙한 판다스 사용) 

%spark_yarn.pyspark
import json
pdf = tweet_table.toPandas()

lang=pdf['tweet'].apply(lambda x: json.loads(x)['data']['lang'])
created_at=pdf['tweet'].apply(lambda x: json.loads(x)['data']['created_at'])
text=pdf['tweet'].apply(lambda x: json.loads(x)['data']['text'])
pdf_tweet=pd.DataFrame({
    "created_at":created_at,
    "lang":lang,
    "text":text
})
df_tweet=spark.createDataFrame(pdf_tweet)
df_tweet.show(4)

먼저, 판다스 데이터프레임으로 변경한 다음. lambda apply로 필요한 부분만 직접 파싱 해서 새로운 데이터프레임을 생성하고 다시 스파크 데이터프레임으로 변경하는 코드다. 상당히 번거로운 방법이지만 별다른 방법을 찾지 못했을 때 궁여지책으로 사용하였다.

 

 

방법 2. 데이터프레임을 RDD로 변경하여 파싱

%spark_yarn.pyspark
rdd = tweet_table.rdd
rdd2 = rdd.map(lambda row: str(row["tweet"]))

parsed_tweet_table = spark.read.json(rdd2)
parsed_tweet_table.printSchema()
parsed_tweet_table.cache()

read.json(path)의 경우 path 에는 json 타입의 파일을 읽는 데 사용되기도 하지만(그래서 처음 찾아봤을 때 무심코 넘겨버림) read.json(rdd) 형태로도 사용할 수 있다. 때문에 tweet_table을 먼저 rdd로 변환한 다음에 json을 읽으면  아래처럼 깔끔하게 파싱 된다.

root
 |-- data: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- id: string (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- text: string (nullable = true)
 |-- matching_rules: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- tag: string (nullable = true)

 

 

8. TempView 등록 후 SQL 집계

%spark_yarn.pyspark

df_tweet.createOrReplaceTempView("tweet_tempview")
tweet_groupby_lang=spark.sql("""
SELECT
    lang,
    count(lang) as lang_cnt
FROM tweet_tempview
GROUP BY lang
ORDER BY lang_cnt desc
LIMIT 10;
""")
z.show(tweet_groupby_lang)


#und : undermine(언어감지 못함)
#th : 태국어
#ja : 일본
#in : 인도네시아

 

tempview를 이용하면 어느 정도 익숙한 sql 문을 사용해서 데이터를 살펴볼 수 있다.

뉴진스의 트윗을 언어별로 살펴보면 영어가 가장 많고, 한국어, 태국어, 일본어 순으로 많다. (트윗에서 언어감지를 못했을 경우에 und로 분류)

 

 

데이터프레임으로도 아래와 같이 sql과 동일하게 집계가 가능하다.

from pyspark.sql.functions import lit
df_all_tweet_agg=df_tweet\
    .withColumn("lang_cnt" , lit(1))\
    .groupBy("lang")\
    .agg({"lang_cnt": "sum"})\
    .orderBy("sum(lang_cnt)",ascending=False)\
    .limit(10)
z.show(df_all_tweet_agg)

 

 

9. 머신러닝 모델링(K-means) 언어별 군집화

트위터에서는 자체적으로 트윗내용을 언어별로 구분해 놓고 있다. 이  lang을 label로 설정해서 지도학습으로 모델을 학습하는 방법도 있으나 그래도 트위터에서 한 언어별 구분이 완벽하지는 않을 것이기에 자체적인 군집화를 진행한다.

 

 

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

@udf(ArrayType(StringType()))
def n_gram(text, n):
    return [text[i:i+n] for i in range(len(text)-(n-1))]
    
spark.udf.register("n_gram", n_gram)

먼저 n-gram 함수를 만들어서 udf 등록을 한다. n-gram은 문장을 구성하는 단어들의 연속 조합을 말하며 여기서는 텍스트의 길이로(n=2) 자른다( hellow world  -> he/ll/ow/ w/or/ld)

 

 

이렇게 잘라진 트윗 내용에 HashingTF를 적용한다. HashingTF는 단어들을 해시 값으로 변환하여 각 단어들의 빈도수를 기록한다. 이렇게 구한 해시 값들을 이용하여 벡터를 구성할 수 있다.

from pyspark.ml.feature import HashingTF

hashingTF = HashingTF(inputCol="terms", outputCol="features", numFeatures=140) #트윗 글자수 280제한 
data_df_ngram_HTF = hashingTF.transform(data_df_ngram)

data_df_ngram_HTF.show(5, truncate=False)

 

 

위의 내용들을 종합하면 아래와 같다.

%spark_yarn.pyspark
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF
from pyspark.ml.clustering import KMeans, KMeansModel

tweet_table="tweet_table_1"
model_path="hdfs://spark-master-01:9000/dahy/model/kmeans"
num_cluster=lang_num
num_iteration=100


#DW에 저장된 Tweet 읽은 후 RDD로 
spark.catalog.refreshTable(tweet_table)
spark.table(tweet_table).printSchema()
print(">>>> tweet_table data count: " + str(spark.table(tweet_table).count()) + "\n")
rdd_tweet = spark \
    .table(tweet_table) \
    .rdd \
    .map(lambda x: str(x["tweet"]))
tweet_df=spark.read.json(rdd_tweet)
tweet_data_rdd=tweet_df.select("data").rdd
tweet_data_rdd2=tweet_data_rdd.map(lambda x:x["data"])
tweet_data_df=spark.createDataFrame(tweet_data_rdd2)


#캐시된 DataFrame으로 Temp View 생성
tweet_data_df.createOrReplaceTempView("df_tweet")

#text 추출
tweet_text = tweet_data_df.select('text')
tweet_text.printSchema()
tweet_text.show(5, truncate=False)

#n-gram 처리
n = 2
tweet_text_term = tweet_text.withColumn("terms", n_gram(tweet_text['text'], lit(n)))
tweet_text_term.show(5, truncate=False)

#features 생성
hashingTF = HashingTF(inputCol="terms", outputCol="features", numFeatures=140)
tweet_text_term_htf= hashingTF.transform(tweet_text_term)

#--학습 데이터 캐시
tweet_text_term_htf.cache()
tweet_text_term_htf.count()
tweet_text_term_htf.show(5, truncate=False)
tweet_text_term_htf

 

10. 모델 학습 및 저장 캐시

kmeans = KMeans() \
    .setFeaturesCol("features") \
    .setPredictionCol("prediction") \
    .setK(num_cluster) \
    .setInitMode("k-means||") \
    .setMaxIter(num_iteration) \
    .setSeed(1234)
    
#학습
model = kmeans.fit(tweet_text_term_htf)
#모델저장
model.write().overwrite().save(model_path)  

#학습 데이터 캐시 해제
tweet_text_term_htf.unpersist()

모델 로드
model = KMeansModel.load(model_path)

 

 

11. 모델 로드 및 예측 결과

model = KMeansModel.load(model_path)

#예측
transformed_df= model.transform(tweets_feature_df_htf)
#클러스터별 결과
for num in range(num_cluster):
    print("\n>>>> CLUSTER {0}:".format(num))
    transformed_df.where(col('prediction') == num).select("lang", "text", "prediction").show(truncate=False)

 

일부 텍스트만 확인해 보면  첫 번째 열 th(태국어)는 트위터에서 구분한 값이고(≒정답) 맨 마지막 열의 숫자는 clustering 한 숫자 값으로 13번이 th로 보인다. 태국어만 봤을 땐 나름 괜찮게 학습이 된 것 같다. 


느낀 점

  • 대부분 문제의 해답은 공식문서에 있는 것 같다.
  • Spark의 Structured Streaming으로 streaming 데이터를 다루는데 데이터프레임을 다루는 것과 큰 차이를 못 느꼈다. 
  • 실시간 트윗을 바탕으로 실시간 대시보드를 만들어보는 것도 좋을 것 같다.

 

댓글