플링크의 역사가 아직 짧기 때문에 버전업이 꾸준히 그리고 빠르게 진행되고 있는것 같다.
덕분에 매번 새로운 버전마다 바뀌고 삭제되고 추가되는 메서드들이 굉장히 많다.
현재 플링크 정식 릴리즈 중 가장 최신 버전인 v1.15 부터는 TableEnviroment에서 execute() 메서드가 삭제되었다.
플링크 1.15v 릴리즈 docs중 execute메서드 삭제 관련 링크
**기타 내용 참고. 1.14 버전 이후 부터 레거시 플래너는 더 이상 지원하지 않는다고 한다. 그래서 버전업을 하게 되는 경우 use_blink_planner 같은 blink 관련 메서드들을 삭제해야 한다.
그래서 1.15버전 이후 부터는 기존 아래 코드를 어떻게 변경 해야 하는지 작성해 보았다.
# 파이썬
# 더이상 사용되지 않는 코드
t_env.execute()
플링크의 TableEnvironment에서 execute 메서드 삭제 이유
플링크를 이제 막 공부중에 있는데 느끼던 것이 있었다.
여러가지 환경들에 대한 모듈들을 굉장히 짜잘짜잘하게 여러 모듈들로 나뉘어져 있어서 불편하다는 것이었다.
삭제된 이유는 다음과 같다.
Motivation 4번 발췌한 내용중 일부.
기존에 StreamExecutionEnvironment 와 TableEnvironment가 나뉘어져 있고, 두 모듈 모두 execute 메서드를 실행 할 수 있었다.
그러나 TableEnvironment을 사용해 플링크 테이블 프로그램을 구축하는 경우 StreamExecutionEnvironment 의 인스턴스를 가져올 수 없기 때문에 TableEnvironment.execute()를 실행해야 한다.
혹은 StreamTableEnvironment를 사용해 플링크 테이블 프로그램을 구축하는 경우도 서로 다른 환경 둘 다 execute 메서드를 사용할 수 있다.
벌써 느껴지는 뭐가 뭔지 헷갈리는 불편함.
이런 경우 프로그램이 어떤 'execute'가 실행되고 있는지 불분명해진다는 것이다.
그래서 테이블 환경에서 execute 메서드는 삭제가 되었고, 대체 된 방법으로 간결하게 표현 할 수 있게 되었다.
TableEnvironment의 execute 메서드 삭제에 따른 대체 코드
*기존의 경우
# 기존 execute() 실행 방법
t_env.from_path("source").insert_into("sink")
t_env.execute("job name")
바뀐 대체 방법은 아래와 같다.
#
# 쿼리문으로 테이블을 생성하는 경우
#
# 환경 세팅 생략..
t_env = StreamTableEnvironment.create(env, envsettings) # 테이블 환경 세팅
# 소스 테이블을 만드는 쿼리
source_queury = """
CREATE TABLE source(
aaa STRING
bbb INT
) with (
'connector' = 'kafka'
생략..
)
"""
# sink 테이블을 만드는 쿼리
sink_queury = """
CREATE TABLE sink(
ccc STRING
ddd INT
) with (
'connector' = 'kafka'
생략..
)
"""
# 쿼리를 실행해 테이블을 생성
t_env.execute_sql(source_query)
t_env.execute_sql(sink_query)
# 아래 두 코드는 같은 내용이다. 둘 중 아무거나 선택해서 사용
t_env.from_path("source").execute_insert("sink").wait()
t_env.execute_sql("INSERT INTO sink SELECT * chapter FROM source").wait()
execute_insert 혹은 execute_sql 로 테이블 환경이라는 것을 직관적으로 알 수 있다.
주의점은 wait() 을 붙여주지 않으면 기존 execute 메서드 처럼 계속해서 실행중인 상태가 되지 않는다. wait()이 없으면 결과 값을 리턴해주기 때문이다.
데이터 스트림을 전달하는 과정에서도 크게 다르지 않다.
execute_insert() 메서드를 사용한 이후 wait()을 붙여주면 된다.
# 데이터 스트림이 올때 테이블을 만들고 insert 해준다.
table = t_env.from_data_stream(ds)
table.execute_insert("sink").wait()
아직 깊게 공부하지 않았기 때문에 잘못 알고 있는 내용이 있을 수 있습니다.
잘못된 점을 보시게 되면 답글 부탁드립니다.
'Development > Data Engineering' 카테고리의 다른 글
ETL이란? (0) | 2022.07.08 |
---|---|
tweepy 로 트위터 API V2 스트리밍 하기 (1) | 2022.05.25 |
[Docker] M1 맥 도커 confluentinc/cp-kafka 대체 이미지 (0) | 2022.04.28 |
[Apache Spark] 아파치 스파크 RDD란? (0) | 2022.04.19 |
[Apache Airflow] 아파치 에어플로우란? (0) | 2022.04.16 |
댓글