대규모 Tableau 데이터소스 처리, 30초면 충분했습니다
최근 준비 중인 POC에서 AI 에이전트가 Tableau의 데이터소스를 직접 Text-to-SQL로 쿼리할 수 있는지 검증하기 위해, 분석용 DB에 RAW 데이터를 적재해 보는 역할이 제게 떨어졌습니다.
그래서
그 과정을 정리해 봤습니다 :)
왜 DuckDB인가?
일단 왜 DuckDB를 선택했냐면.
- OLAP 최적화: 컬럼 기반 스토리지라 분석 쿼리에 딱 맞음 (Tableau의 .hyper 파일 역시 컬럼 기반이라 특성이 잘 맞았습니다)
- 임베디드: 서버 따로 안 띄워도 파일 하나로 동작 이게 정말 중요했습니다.
ParquetParquetHadoop 생태계의 컬럼 지향 압축 오픈 소스 파일 포맷으로, 데이터 분석 쿼리에 최적화되어 있습니다. 네이티브:CREATE TABLE AS SELECT * FROM read_parquet(...)한 줄로 적재 끝- 메모리 효율: 억 단위 행도 스트리밍으로 처리 가능
AI 에이전트가 쿼리하는 용도라면 빠른 집계가 중요하니까, OLAP에 최적화된 DuckDB가 자연스러운 선택이었습니다. 🐤
그리고 전체 파이프라인 구조는 이렇습니다.
TDSX 다운로드 → 압축 해제 → .hyper → Parquet 변환 → DuckDB 적재
↓ ↓
.tds 파싱 메타데이터 저장 (RDB)
(관계, 캡션, 계산 필드)
Phase 1: 순차 처리 (Baseline)
처음엔 구현이 먼저니까 아래와 같이 제일 단순하게 짰습니다.
for luid in datasource_luids:
tdsx = download(luid) # 네트워크 I/O
parquet = convert(tdsx) # CPU-bound
load_to_duckdb(parquet) # 디스크 I/O
결과는? 네, 당연히 느렸습니다. 이유는 생각보다 더 간단했는데요. 😅
- 41개나 되는 걸 하나씩 ‘줄 세워’ 처리하니 대기 시간이 끝도 없이 누적됨
- 중간에 460만 행짜리 ‘대어’가 하나 걸리면 나머지 전체가 그냥 무한 대기 이건 진짜 아니죠~~
- 네트워크 I/O 기다릴 땐 CPU가 놀고, CPU 쓸 땐 네트워크가 노는… 아주 비효율적인 상황ㅎㅎ
Phase 2: 병렬 파이프라인
핵심 아이디어
작업을 단계별로 분리해서, 각 단계의 특성에 맞는 동시성 전략을 적용했습니다.
| 단계 | 특성 | 선택한 동시성 |
|---|---|---|
| 다운로드 + 변환 | 네트워크 I/O + CPU | Semaphore(5) |
| DuckDB 적재 | Single-writer 제약 | asyncio.Queue 순차 |
DuckDB는 동시에 여러 writer가 쓰면 오히려 느려지거나 에러가 납니다. 그래서 다운로드/변환은 병렬로 달리되, 적재만 큐에서 꺼내 순차로 처리하는 구조를 택했습니다.
semaphore = asyncio.Semaphore(5) # 동시 5개 제한
load_queue = asyncio.Queue() # 적재 큐
async def process_luid(luid):
async with semaphore:
tdsx = await download(luid)
parquet = await asyncio.to_thread(convert, tdsx) # CPU-bound → 별도 스레드
await load_queue.put(parquet) # 변환 완료 즉시 큐에 넣기
async def load_worker():
while not done:
parquet = await load_queue.get()
await duckdb.load_parquet(parquet)
이때 최적화 포인트 몇 가지를 함께 적용했는데요.
- httpx 클라이언트 재사용: 매 요청마다 새 클라이언트 생성하면 커넥션 맺는 비용이 계속 발생합니다.
- 하나의
AsyncClient로 커넥션 풀 공유하도록 변경
- 하나의
- 즉시 큐 투입: 전체 변환이 끝날 때까지 기다리지 않고, 변환 완료된 것부터 바로 DuckDB 적재 시작
- 실패 격리: 하나가 실패해도 나머지는 계속 진행
- 임시 파일 정리: TDSX 원본은 변환 후 즉시 삭제, Parquet은 적재 후 삭제
Phase 3: Hyper 변환 최적화 (pantab)
What is pantab?
pantab는 Tableau의 .hyper 파일과 Python의 Pandas/Arrow 데이터 구조를 연결해주는 오픈소스 라이브러리입니다. 단순히 데이터를 읽어오는 기능을 넘어, Hyper SDK의 C++ 엔진을 직접 활용하여 대량의 데이터를 메모리 레벨에서 고속으로 변환할 수 있도록 설계되었습니다.
병렬화 후에도 병목 현상이 완전히 해결된 것은 아니더라고요! 변환 단계가 여전히 느렸습니다. 특히 460만 행짜리 데이터소스가 문제였어요 🤣
현재까지 구현된 기존 코드를 보면 이렇게 되어 있었습니다.
# tableauhyperapi (Python row-by-row)
with HyperProcess(...) as hyper:
with Connection(...) as conn:
with conn.execute_query("SELECT * FROM ...") as result:
while result.next_row(): # ← Python 루프
values = result.get_values() # ← 행 단위 읽기
for j, idx in enumerate(col_indices):
batch[j].append(convert(values[idx])) # ← 값 변환
C++ 엔진 위에 Python 루프를 올려놓은 건데, Python 루프가 C++ 속도를 다 죽이고 있던 거였습니다 🤫
여기서 pantab을 도입했습니다. pantab은 Hyper C++ API를 직접 바인딩해서 Python 루프 없이 Hyper → Arrow 메모리 포맷으로 바로 변환해 줍니다.
# pantab (C++ 직접 변환)
tables = pantab.frames_from_hyper(hyper_path, return_type="pyarrow")
for table_key, arrow_table in tables.items():
pq.write_table(arrow_table, parquet_path) # ← 끝
73줄짜리 변환 코드가 사실상 이 몇 줄로 줄었고, 460만 행 처리 소요시간이 평균 47초 → 대략 3-5초로 빨라졌습니다. 10배 가까이 빨라진 셈이죠. 🥳
단일 데이터소스 처리 시간 비교 (460만 행 기준)
근데 문제가 있었습니다
pantab이 처리 못 하는 케이스가 있었거든요.
| 에러 | 원인 | 건수 |
|---|---|---|
UnicodeDecodeError | Hyper 파일 내 copyright 기호(©, 0xA9) | 3건 |
RuntimeError: table does not exist | 한글 테이블명 + GUID 접미사 조합 | 2건 |
실제 데이터엔 항상 이런 엣지 케이스가 있더라고요. 한글, 특수문자, 레거시 인코딩…
그래서 pantab 1차 → tableauhyperapi fallback 2차 전략으로 갔습니다.
@staticmethod
def _convert_hyper(hyper_path, output_dir):
try:
return _convert_hyper_pantab(hyper_path, output_dir) # 1차: 고속
except Exception as e:
logger.warning(f"pantab 실패 → fallback: {e}")
return _convert_hyper_fallback(hyper_path, output_dir) # 2차: 안정
실제로 가지고 있는 데이터셋으로 테스트해 보니 아래와 같이 처리되었습니다.
- 대부분(88%): pantab C++ 고속 처리
- 예외(12%): tableauhyperapi Python fallback으로 안정 처리
- 100% 성공률 보장
추가: 불필요한 재적재 방지 (Conditional Ingestion)
Tableau TDSX는 구조상 ‘부분 다운로드’가 불가능합니다. 데이터가 1행만 바뀌어도 전체 패키지를 다시 내려받아야 하죠. 하지만 데이터가 전혀 바뀌지 않았는데 매번 이 무거운 전 과정을 반복하는 건 큰 낭비입니다.
그래서 Tableau REST API의 updatedAt 필드를 활용해, 실제로 변경 내역이 있을 때만 전체 공정(다운로드-변환-적재)을 수행하도록 최적화했습니다.
async def _check_skip_ingestion(self, datasource_luid):
# 1. DB 먼저: 마지막 성공 적재 시각 조회 (가벼운 쿼리)
last_success = await repo.find_latest_success(datasource_luid)
if last_success is None:
return False # 이력 없음 → 적재 진행
# 2. Tableau API: updatedAt 조회 (이력이 있을 때만 호출)
ds_meta = await rest_client.get_datasource(datasource_luid)
tableau_updated_at = parse_tableau_datetime(ds_meta["datasource"]["updatedAt"])
# 3. 비교
if tableau_updated_at <= last_success.completed_at:
return True # 변경 없음 → 스킵 (불필요한 전체 다운로드 방지)
return False # 변경 감지 → 적재 진행
설계할 때 신경 쓴 것들:
- Conditional Full Refresh: 부분 업데이트가 안 되는 제약을 인정하고, 대신 ‘수행 여부’를 결정하는 데 집중했습니다.
- DB 조회 → HTTP 호출 순서: 로컬 이력이 없으면 API 호출도 생략하도록 순서를 배치했습니다.
- Fail-open: 체크 로직이 실패하면 보수적으로 ‘적재 진행’을 선택해 데이터 유실을 방지했습니다.
- force 옵션: 가끔 강제로 덮어써야 할 때를 위해
force = true파라미터를 마련해 두었습니다.
결과
전체 파이프라인 소요 시간 비교
| 지표 | Phase 1 (순차) | Phase 2 (병렬) | Phase 3 (pantab) |
|---|---|---|---|
| 총 소요 시간 | - | 61.8s | 38.6s |
| 성공률 | - | 39/41 (95%) | 41/41 (100%) |
| 총 행 수 (~2,000만) | - | 18,236,982 | 18,272,259 |
| 총 테이블 수 | - | 67 | 85 |
| 총 데이터 용량 | - | - | ~300MB |
| 최대 단일 소스 처리 | ~47s | ~47s | ~3-5s |
| (460만 행 기준) |
코드도 많이 줄었습니다.
| 항목 | Before | After |
|---|---|---|
| Hyper 변환 코드 | 73줄 (타입 매핑, 청크, 값 변환) | 30줄 (pantab 호출 + Parquet 저장) |
| 의존성 | tableauhyperapi only | pantab (primary) + tableauhyperapi (fallback) |
이번 작업에서 느낀 것들
1. Python 루프가 병목이다
C++ 엔진 위에 Python row-by-row 루프를 올리면 10-50x 느려지는 케이스가 실제로 있습니다. 가능하면 pantab, polars처럼 C++ 바인딩 라이브러리를 쓰는 게 낫습니다 👍
2. Fallback은 필수다
고속 라이브러리가 엣지 케이스를 다 커버하진 않습니다. 한글, 특수문자, 레거시 인코딩이 실제 현업 데이터에선 항상 등장하더라고요. pantab만 믿었으면 5개 실패했을 겁니다 🤣
3. 병렬화의 핵심은 제약 조건 파악
DuckDB single-writer 제약을 무시하고 동시 적재하면 오히려 느려지거나 에러가 납니다. 다운로드+변환은 병렬, 적재는 큐 기반 순차 — 이게 각 단계의 제약 조건에 맞는 최적 구조였습니다.
4. 증분 체크는 DB 먼저
가벼운 DB 쿼리로 먼저 판단하고, 필요할 때만 외부 API 호출. 순서 하나 바꾸는 것만으로 불필요한 HTTP 요청을 많이 줄일 수 있습니다.
5. Fail-open 설계
최적화 로직이 실패해도 원래 동작(Full Replace)으로 돌아가야 합니다. 증분 체크가 터져서 데이터가 누락되면 최적화가 오히려 장애가 되는 거니까요.