Python DuckDB Tableau asyncio 최적화 파이프라인

대규모 Tableau 데이터소스 처리, 30초면 충분했습니다

최근 준비 중인 POC에서 AI 에이전트가 Tableau의 데이터소스를 직접 Text-to-SQL로 쿼리할 수 있는지 검증하기 위해, 분석용 DB에 RAW 데이터를 적재해 보는 역할이 제게 떨어졌습니다.

Tableau to DuckDB Pipeline 처리 과정

그래서 TableauTableau데이터를 시각화하고 관련 대시보드를 구축하기 위해 널리 쓰이는 업계 표준 BI 플랫폼입니다. Server에 있는 TDSX 파일들을 DuckDBDuckDBSQLite처럼 동작하지만 OLAP(분석용) 쿼리에 최적화된 고성능 임베디드 데이터베이스입니다.에 이관하는 파이프라인을 만들기 시작했는데요. 대상은 41개 데이터소스, 총 약 2,000만 행, 용량으로는 약 300MB 정도의 규모였습니다. 처음에 짠 코드가 예상보다 많이 느렸고, 고치면서 고치다 보니 꽤 재밌는 최적화 포인트들을 발견했습니다.

왜 DuckDB를 선택했냐고요? 일단 PoC라서 임베디드 형태인 DuckDB가 편하기도 했고, 이게 잘 되면 다른 전문 OLAP DB(ClickHouse, StarRocks 등) 도입도 고려하고 있었기 때문입니다.

그 과정을 정리해 봤습니다 :)


왜 DuckDB인가?

일단 왜 DuckDB를 선택했냐면.

  • OLAP 최적화: 컬럼 기반 스토리지라 분석 쿼리에 딱 맞음 (Tableau의 .hyper 파일 역시 컬럼 기반이라 특성이 잘 맞았습니다)
  • 임베디드: 서버 따로 안 띄워도 파일 하나로 동작 이게 정말 중요했습니다.
  • ParquetParquetHadoop 생태계의 컬럼 지향 압축 오픈 소스 파일 포맷으로, 데이터 분석 쿼리에 최적화되어 있습니다. 네이티브: CREATE TABLE AS SELECT * FROM read_parquet(...) 한 줄로 적재 끝
  • 메모리 효율: 억 단위 행도 스트리밍으로 처리 가능

AI 에이전트가 쿼리하는 용도라면 빠른 집계가 중요하니까, OLAP에 최적화된 DuckDB가 자연스러운 선택이었습니다. 🐤

그리고 전체 파이프라인 구조는 이렇습니다.

TDSX 다운로드 → 압축 해제 → .hyper → Parquet 변환 → DuckDB 적재
     ↓                          ↓
  .tds 파싱              메타데이터 저장 (RDB)
  (관계, 캡션, 계산 필드)

Phase 1: 순차 처리 (Baseline)

처음엔 구현이 먼저니까 아래와 같이 제일 단순하게 짰습니다.

for luid in datasource_luids:
    tdsx = download(luid)       # 네트워크 I/O
    parquet = convert(tdsx)     # CPU-bound
    load_to_duckdb(parquet)     # 디스크 I/O

결과는? 네, 당연히 느렸습니다. 이유는 생각보다 더 간단했는데요. 😅

  • 41개나 되는 걸 하나씩 ‘줄 세워’ 처리하니 대기 시간이 끝도 없이 누적
  • 중간에 460만 행짜리 ‘대어’가 하나 걸리면 나머지 전체가 그냥 무한 대기 이건 진짜 아니죠~~
  • 네트워크 I/O 기다릴 땐 CPU가 놀고, CPU 쓸 땐 네트워크가 노는… 아주 비효율적인 상황ㅎㅎ

Phase 2: 병렬 파이프라인

핵심 아이디어

작업을 단계별로 분리해서, 각 단계의 특성에 맞는 동시성 전략을 적용했습니다.

단계특성선택한 동시성
다운로드 + 변환네트워크 I/O + CPUSemaphore(5)
DuckDB 적재Single-writer 제약asyncio.Queue 순차

DuckDB는 동시에 여러 writer가 쓰면 오히려 느려지거나 에러가 납니다. 그래서 다운로드/변환은 병렬로 달리되, 적재만 큐에서 꺼내 순차로 처리하는 구조를 택했습니다.

semaphore = asyncio.Semaphore(5)  # 동시 5개 제한
load_queue = asyncio.Queue()       # 적재 큐

async def process_luid(luid):
    async with semaphore:
        tdsx = await download(luid)
        parquet = await asyncio.to_thread(convert, tdsx)  # CPU-bound → 별도 스레드
        await load_queue.put(parquet)  # 변환 완료 즉시 큐에 넣기

async def load_worker():
    while not done:
        parquet = await load_queue.get()
        await duckdb.load_parquet(parquet)
집중

이때 최적화 포인트 몇 가지를 함께 적용했는데요.

  • httpx 클라이언트 재사용: 매 요청마다 새 클라이언트 생성하면 커넥션 맺는 비용이 계속 발생합니다.
    • 하나의 AsyncClient로 커넥션 풀 공유하도록 변경
  • 즉시 큐 투입: 전체 변환이 끝날 때까지 기다리지 않고, 변환 완료된 것부터 바로 DuckDB 적재 시작
  • 실패 격리: 하나가 실패해도 나머지는 계속 진행
  • 임시 파일 정리: TDSX 원본은 변환 후 즉시 삭제, Parquet은 적재 후 삭제

Phase 3: Hyper 변환 최적화 (pantab)

What is pantab?

pantab는 Tableau의 .hyper 파일과 Python의 Pandas/Arrow 데이터 구조를 연결해주는 오픈소스 라이브러리입니다. 단순히 데이터를 읽어오는 기능을 넘어, Hyper SDK의 C++ 엔진을 직접 활용하여 대량의 데이터를 메모리 레벨에서 고속으로 변환할 수 있도록 설계되었습니다.

병렬화 후에도 병목 현상이 완전히 해결된 것은 아니더라고요! 변환 단계가 여전히 느렸습니다. 특히 460만 행짜리 데이터소스가 문제였어요 🤣

현재까지 구현된 기존 코드를 보면 이렇게 되어 있었습니다.

# tableauhyperapi (Python row-by-row)
with HyperProcess(...) as hyper:
    with Connection(...) as conn:
        with conn.execute_query("SELECT * FROM ...") as result:
            while result.next_row():              # ← Python 루프
                values = result.get_values()       # ← 행 단위 읽기
                for j, idx in enumerate(col_indices):
                    batch[j].append(convert(values[idx]))  # ← 값 변환

C++ 엔진 위에 Python 루프를 올려놓은 건데, Python 루프가 C++ 속도를 다 죽이고 있던 거였습니다 🤫

여기서 pantab을 도입했습니다. pantab은 Hyper C++ API를 직접 바인딩해서 Python 루프 없이 Hyper → Arrow 메모리 포맷으로 바로 변환해 줍니다.

# pantab (C++ 직접 변환)
tables = pantab.frames_from_hyper(hyper_path, return_type="pyarrow")
for table_key, arrow_table in tables.items():
    pq.write_table(arrow_table, parquet_path)  # ← 끝

73줄짜리 변환 코드가 사실상 이 몇 줄로 줄었고, 460만 행 처리 소요시간이 평균 47초 → 대략 3-5초로 빨라졌습니다. 10배 가까이 빨라진 셈이죠. 🥳

단일 데이터소스 처리 시간 비교 (460만 행 기준)

근데 문제가 있었습니다

pantab이 처리 못 하는 케이스가 있었거든요.

에러원인건수
UnicodeDecodeErrorHyper 파일 내 copyright 기호(©, 0xA9)3건
RuntimeError: table does not exist한글 테이블명 + GUID 접미사 조합2건

실제 데이터엔 항상 이런 엣지 케이스가 있더라고요. 한글, 특수문자, 레거시 인코딩…

그래서 pantab 1차 → tableauhyperapi fallback 2차 전략으로 갔습니다.

@staticmethod
def _convert_hyper(hyper_path, output_dir):
    try:
        return _convert_hyper_pantab(hyper_path, output_dir)      # 1차: 고속
    except Exception as e:
        logger.warning(f"pantab 실패 → fallback: {e}")
        return _convert_hyper_fallback(hyper_path, output_dir)    # 2차: 안정

실제로 가지고 있는 데이터셋으로 테스트해 보니 아래와 같이 처리되었습니다.

  • 대부분(88%): pantab C++ 고속 처리
  • 예외(12%): tableauhyperapi Python fallback으로 안정 처리
  • 100% 성공률 보장
역시나야

추가: 불필요한 재적재 방지 (Conditional Ingestion)

Tableau TDSX는 구조상 ‘부분 다운로드’가 불가능합니다. 데이터가 1행만 바뀌어도 전체 패키지를 다시 내려받아야 하죠. 하지만 데이터가 전혀 바뀌지 않았는데 매번 이 무거운 전 과정을 반복하는 건 큰 낭비입니다.

그래서 Tableau REST API의 updatedAt 필드를 활용해, 실제로 변경 내역이 있을 때만 전체 공정(다운로드-변환-적재)을 수행하도록 최적화했습니다.

async def _check_skip_ingestion(self, datasource_luid):
    # 1. DB 먼저: 마지막 성공 적재 시각 조회 (가벼운 쿼리)
    last_success = await repo.find_latest_success(datasource_luid)
    if last_success is None:
        return False  # 이력 없음 → 적재 진행

    # 2. Tableau API: updatedAt 조회 (이력이 있을 때만 호출)
    ds_meta = await rest_client.get_datasource(datasource_luid)
    tableau_updated_at = parse_tableau_datetime(ds_meta["datasource"]["updatedAt"])

    # 3. 비교
    if tableau_updated_at <= last_success.completed_at:
        return True   # 변경 없음 → 스킵 (불필요한 전체 다운로드 방지)
    return False      # 변경 감지 → 적재 진행

설계할 때 신경 쓴 것들:

  • Conditional Full Refresh: 부분 업데이트가 안 되는 제약을 인정하고, 대신 ‘수행 여부’를 결정하는 데 집중했습니다.
  • DB 조회 → HTTP 호출 순서: 로컬 이력이 없으면 API 호출도 생략하도록 순서를 배치했습니다.
  • Fail-open: 체크 로직이 실패하면 보수적으로 ‘적재 진행’을 선택해 데이터 유실을 방지했습니다.
  • force 옵션: 가끔 강제로 덮어써야 할 때를 위해 force = true 파라미터를 마련해 두었습니다.

결과

전체 파이프라인 소요 시간 비교

지표Phase 1 (순차)Phase 2 (병렬)Phase 3 (pantab)
총 소요 시간-61.8s38.6s
성공률-39/41 (95%)41/41 (100%)
총 행 수 (~2,000만)-18,236,98218,272,259
총 테이블 수-6785
총 데이터 용량--~300MB
최대 단일 소스 처리~47s~47s~3-5s
(460만 행 기준)

코드도 많이 줄었습니다.

항목BeforeAfter
Hyper 변환 코드73줄 (타입 매핑, 청크, 값 변환)30줄 (pantab 호출 + Parquet 저장)
의존성tableauhyperapi onlypantab (primary) + tableauhyperapi (fallback)

이번 작업에서 느낀 것들

1. Python 루프가 병목이다

C++ 엔진 위에 Python row-by-row 루프를 올리면 10-50x 느려지는 케이스가 실제로 있습니다. 가능하면 pantab, polars처럼 C++ 바인딩 라이브러리를 쓰는 게 낫습니다 👍

2. Fallback은 필수다

고속 라이브러리가 엣지 케이스를 다 커버하진 않습니다. 한글, 특수문자, 레거시 인코딩이 실제 현업 데이터에선 항상 등장하더라고요. pantab만 믿었으면 5개 실패했을 겁니다 🤣

3. 병렬화의 핵심은 제약 조건 파악

DuckDB single-writer 제약을 무시하고 동시 적재하면 오히려 느려지거나 에러가 납니다. 다운로드+변환은 병렬, 적재는 큐 기반 순차 — 이게 각 단계의 제약 조건에 맞는 최적 구조였습니다.

4. 증분 체크는 DB 먼저

가벼운 DB 쿼리로 먼저 판단하고, 필요할 때만 외부 API 호출. 순서 하나 바꾸는 것만으로 불필요한 HTTP 요청을 많이 줄일 수 있습니다.

5. Fail-open 설계

최적화 로직이 실패해도 원래 동작(Full Replace)으로 돌아가야 합니다. 증분 체크가 터져서 데이터가 누락되면 최적화가 오히려 장애가 되는 거니까요.