본문 바로가기
Development/Data Engineering

[Apache Flink] 플링크 v1.15 TableEnviroment execute() 메서드 삭제

by _KHK 2022. 5. 12.

플링크의 역사가 아직 짧기 때문에 버전업이 꾸준히 그리고 빠르게 진행되고 있는것 같다.

덕분에 매번 새로운 버전마다 바뀌고 삭제되고 추가되는 메서드들이 굉장히 많다.

 

현재 플링크 정식 릴리즈 중 가장 최신 버전인 v1.15 부터는 TableEnviroment에서 execute() 메서드가 삭제되었다.

플링크 1.15v 릴리즈 docs중 execute메서드 삭제 관련 링크

 

Release Notes - Flink 1.15

Release notes - Flink 1.15 # These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.14 and Flink 1.15. Please read these notes carefully if you are planning to upgrade your Flink versio

nightlies.apache.org

**기타 내용 참고. 1.14 버전 이후 부터 레거시 플래너는 더 이상 지원하지 않는다고 한다. 그래서 버전업을 하게 되는 경우 use_blink_planner 같은 blink 관련 메서드들을 삭제해야 한다.

 

 

그래서 1.15버전 이후 부터는 기존 아래 코드를 어떻게 변경 해야 하는지 작성해 보았다.

# 파이썬
# 더이상 사용되지 않는 코드
t_env.execute()

 


 

플링크의 TableEnvironment에서 execute 메서드 삭제 이유

플링크를 이제 막 공부중에 있는데 느끼던 것이 있었다.

여러가지 환경들에 대한 모듈들을 굉장히 짜잘짜잘하게 여러 모듈들로 나뉘어져 있어서 불편하다는 것이었다.

 

삭제된 이유는 다음과 같다.

Motivation 4번 발췌한 내용중 일부.

 

기존에 StreamExecutionEnvironmentTableEnvironment가 나뉘어져 있고, 두 모듈 모두 execute 메서드를 실행 할 수 있었다.

그러나 TableEnvironment을 사용해 플링크 테이블 프로그램을 구축하는 경우 StreamExecutionEnvironment 의 인스턴스를 가져올 수 없기 때문에 TableEnvironment.execute()를 실행해야 한다.

혹은 StreamTableEnvironment를 사용해 플링크 테이블 프로그램을 구축하는 경우도 서로 다른 환경 둘 다 execute 메서드를 사용할 수 있다.

 

벌써 느껴지는 뭐가 뭔지 헷갈리는 불편함.

 

이런 경우 프로그램이 어떤 'execute'가 실행되고 있는지 불분명해진다는 것이다.

그래서 테이블 환경에서 execute 메서드는 삭제가 되었고, 대체 된 방법으로 간결하게 표현 할 수 있게 되었다.

 

 


TableEnvironment의  execute 메서드 삭제에 따른 대체 코드

*기존의 경우

# 기존 execute() 실행 방법

t_env.from_path("source").insert_into("sink")
t_env.execute("job name")

 

바뀐 대체 방법은 아래와 같다.

#
# 쿼리문으로 테이블을 생성하는 경우
#

# 환경 세팅 생략..

t_env = StreamTableEnvironment.create(env, envsettings) # 테이블 환경 세팅 

# 소스 테이블을 만드는 쿼리
source_queury = """
	CREATE TABLE source(
    	aaa STRING
        bbb INT
    ) with (
    	'connector' = 'kafka' 
         생략..
    )    
"""

# sink 테이블을 만드는 쿼리
sink_queury = """
	CREATE TABLE sink(
        ccc STRING
        ddd INT
    ) with (
    	'connector' = 'kafka' 
         생략..
    )    
"""

# 쿼리를 실행해 테이블을 생성
t_env.execute_sql(source_query)
t_env.execute_sql(sink_query)

# 아래 두 코드는 같은 내용이다. 둘 중 아무거나 선택해서 사용
t_env.from_path("source").execute_insert("sink").wait()
t_env.execute_sql("INSERT INTO sink SELECT * chapter FROM source").wait()

execute_insert 혹은 execute_sql 로 테이블 환경이라는 것을 직관적으로 알 수 있다.

주의점은 wait() 을 붙여주지 않으면 기존 execute 메서드 처럼 계속해서 실행중인 상태가 되지 않는다. wait()이 없으면 결과 값을 리턴해주기 때문이다.

 

데이터 스트림을 전달하는 과정에서도 크게 다르지 않다.

execute_insert() 메서드를 사용한 이후 wait()을 붙여주면 된다.

# 데이터 스트림이 올때 테이블을 만들고 insert 해준다.
table = t_env.from_data_stream(ds) 
table.execute_insert("sink").wait()

 

 

 

 

아직 깊게 공부하지 않았기 때문에 잘못 알고 있는 내용이 있을 수 있습니다. 

잘못된 점을 보시게 되면 답글 부탁드립니다.

댓글