Gorio Tech Blog search

HGT(Heterogeneous Graph Transformer) 설명

|

이번 글에서는 HGT란 알고리즘에 대해 다뤄보겠다. 상세한 내용은 논문 원본을 참고하길 바라며, 본 글에서는 핵심적인 부분에 대해 요약 정리하도록 할 것이다. 저자의 코드는 Github에서 확인할 수 있다.


Heterogeneous Graph Transformer 설명

1. Background

기존의 많은 GNN이 Homogenous Graph에만 집중된 것에 반해, HGT는 여러 node type, edge type을 가진 Heterogenous Graph 데이터에 대해 적합한 알고리즘으로 제안되었다.

Heterogenous Graph에 대한 접근법은 여러 가지가 있지만 대표적으로 meta-path를 이용한 방법과 GNN을 이용한 방법이 존재한다. 그런데 이러한 방법에는 몇 가지 결점이 존재한다.

  • heterogenous graph의 각 type에 맞는 meta-path design을 하려면 구체적인 domain 지식이 필요하다.
  • 다른 type의 node/edge가 같은 feature를 공유하거나, 혹은 아예 다른 feature를 갖는 경우 graph의 특징을 온전히 포착하기는 어렵다.
  • 모든 graph의 동적 특성은 대부분 무시되고 있다.

HGT의 목표는 다음과 같다.

  • Network dynamics는 포착하면서 각 node/edge-type에 적합한 representation 학습
  • customized meta-path를 특별히 설계하지 않음
  • Web-scale graph에 적합하도록 highly scalable할 것

2. Heterogenous Graph Mining

Heterogenous Graph의 정의에 대해 살펴보자.

[G = (\mathcal{V}, \mathcal{E}, \mathcal{A}, \mathcal{R})]

각 집합은 node, edge, node type, edge type을 의미한다. 이 때 각 node $v \in \mathcal{V}$ 이고, 각 edge $e \in \mathcal{E}$ 이다. 그리고 다음과 같은 type mapping 함수가 존재한다.

[\tau(v): V \rightarrow \mathcal{A}, \phi(e): E \rightarrow \mathcal{R}]

본격적인 구조 설명에 앞서 몇 가지 개념들에 대해 짚고 넘어간다.

Meta Relation
edge $e = (s, t)$ 가 존재할 때, 각 node $s, t$ 는 물론 edge $e$ 도 각자의 type을 가질 것이다. 이 때 이들 관계의 meta relation은 아래와 같이 표현할 수 있다.

[<\tau(s), \phi(e), \tau(t)>]

이는 기존의 여러 meta-path 방법론에서도 설명된 개념이다. 3개의 요소 모두가 같아야만 같은 관계로 인식된다. 그런데 HGT는 여기에서 시간의 개념을 추가한다.

Dynamic Heterogenous Graph
앞서 예시로 들었던 edge $e=(s, t)$ 에 timestamp $T$ 를 할당해보자. 이는 node $s$ 가 $T$ 시점에 node $t$ 와 연결되었음을 의미한다. 이러한 관계가 처음으로 나타났다면 $s$ 에게 $T$ 시점이 할당된다. 물론 node $s$ 가 여러 번 연결된다면 복수의 timestamp를 갖게 될 것이다.

이는 edge의 timestamp는 불변함을 의미한다. 당연하다. 예를 들어 어떤 논문이 WWW에 1994년에 등재되었다면, 이 때의 timestamp는 1994년인 것이다.


3. Heterogenous Graph Transformer

HGT의 목표는 source node로 부터 정보를 통합하여 target node $t$ 에 대한 contextualized representation을 얻는 것이다.

3.1. Heterogenous Message Passing & Aggregation

아래 그림은 전체적인 구조를 나타낸다. 총 $L$ 개의 Layer를 쌓는 방식으로 되어 있고, $H^l$ 은 $l$ 번째 HGT layer의 output이다.

(1), (2), (3)으로 구분되어 있듯이 이 과정은 크게 3가지로 구분되며, 효과적인 학습을 위해 3가지의 추가적인 장치가 배치된다. 추가적인 장치는 3.2에서 설명하도록 하겠다.

Step1: Heterogenous Mutual Attention
Step2: Heterogenous Message Passingg
Step3: Target-specific Aggregation

일단 주어진 상황은 다음과 같다. 특정 target node $t$ 가 존재할 때, 2개의 source node $s_1, s_2$ 가 $e_1, e_2$ 라는 edge를 통해 target node와 관계를 맺고 있는 것이다. 이 때 node인 $t, s_1, s_2$ 의 경우 node feature 벡터를 갖는다. (node feature가 없으면 인위적으로 생성해야 한다.) 각 feature 벡터의 길이는 일단 $d$ 로 같다고 가정한다. 실제로는 최초의 Projection Layer에서 같은 길이로 통일되기 때문에 node type별로 다른 feature 길이를 가져도 무방하다. 어쨌든 지금은 $d$ 라는 길이로 통일되어 있다고 생각하자. 그렇다면 지금까지의 이야기로 2개의 meta relation이 존재하는 것이다.

[<\tau(s_1), \phi(e_1), \tau(t)>, <\tau(s_2), \phi(e_2), \tau(t)>]

1번째 meta relation을 기준으로 이야기를 이어나가 보겠다. Step1, 2에서 해야할 일은 source node $s_1$ 이 $e_1$ 이라는 edge를 통해 target node $t$ 에 주는 영향력을 수식으로 나타내는 것이다. 이는 Multi-head Attention으로 구현되는데, 기존의 Vanilla Transformer를 사용하면 다른 source/target node, 여러 node type 모두 같은 feature distribution을 공유하게 되므로 이는 현재 상황에 적합한 세팅은 아니다.

Step1: Heterogenous Mutual Attention

이러한 단점을 보완하기 위해 Heterogenous Mutual Attention 메커니즘이 도입된다. 이 메커니즘은 Multi-head Attention의 핵심 구조는 그대로 따르지만 몇 가지 차이점이 있다. 먼저 target node 벡터와 source node 벡터는 각각 Query 벡터, Key 벡터로 매핑되는데 이 때 각각의 node type에 따라 projection weight parameter가 다르다. 즉 만약 node type이 10개 있다고 하면, Query 벡터를 만들기 위한 weight matrix는 기본적으로 10 종류가 있는 것이다. (후에 여기에 attention head 수를 곱해야 한다.)

여기가 끝이 아니다. edge type도 weight parameter를 구분한다. $W_{\phi(e)}^{ATT}$ 가 edge type에 dependent한 weight으로 Query 벡터와 Key 벡터의 유사도를 반영한다. 지금까지 설명한 것을 식으로 보자.

[Attention(s, e, t) = Softmax_{\forall s \in N(t)} ( \Vert_{i \in [1, h]} ATT {\text -} head^i(s, e, t) )]

[ATT {\text -} head^i(s, e, t) = (K^i(s) W_{\phi(e)}^{ATT} Q^i(t)^T)) \cdot \frac{\mu <\tau(s), \phi(e), \tau(t)>}{\sqrt{d}}]

2번째 식을 $h$ 개 만들고 이를 concat한 뒤 target별로 softmax 함수를 적용한 것이 최종 결과이다. 즉 2번째 식은 head 1개에 대한 결과물을 의미한다.

식의 좌측이 앞서 설명한 부분으로 아래와 같이 좀 더 세부적으로 표현할 수 있다.

[K^i(s) = K {\text -} Linear^i_{\tau(s)} (H^{l-1}[s])]

[Q^i(t) = Q {\text -} Linear^i_{\tau(t)} (H^{l-1}[t])]

위는 node type에 따라 weight를 구분하는 projection layer다. 이전 layer의 결과물을 받아 linear layer 하나를 통과시켜 Query/Key 벡터를 얻는다. 최종적으로 $h$ 개의 attention head를 얻기 때문에 Query/Key 벡터는 $\mathcal{R}^d \rightarrow \mathcal{R}^{\frac{d}{h}}$ 로 바뀐다. $W_{\phi(e)}^{ATT}$ 는 $\mathcal{R}^{\frac{d}{h}, \frac{d}{h}}$ 의 형상을 갖는다.

지금까지의 과정을 종합해보면, 이 Heterogenous Mutual Attention 메커니즘이 여러 종류의 semantic relation을 충분히 포착할 수 있을 구조를 갖고 있다는 느낌이 들기 시작한다. 그 효용성에 대해서는 검증해보아야겠지만 일단 장치는 마련해둔 셈이다.

2번째 식에서 우측을 보면 아래와 같은 수식이 있다.

[\frac{\mu <\tau(s), \phi(e), \tau(t)>}{\sqrt{d}}]

논문에서는 이 식을 prior tensor라고 지칭하고 있다. 생각해보면, 모든 node/edge type이 같은 영향력을 지니지는 않을 것이다. 즉 데이터 전반을 볼 때 특정 node/edge type이 더 강한 영향력을 지닐 수도 있는 것이다. 이를 반영하기 위해 만들어진 tensor라고 생각하면 된다. 이 tensor를 통해 attention에 대해 adaptive scaling을 수행한다.

코드를 잠시 보고 지나가겠다.

self.relation_pri = nn.Parameter(torch.ones(num_relations, self.n_heads))

논문 원본에는 모든 node/edge type에 따른 prior를 부여하였는데, 저자의 코드는 이를 좀 단순화하여 나타냈다. 만약 원본 코드를 사용할 계획이라면 상황에 따라 수정을 가할 수도 있을 것이다. 만약 수정을 원한다면 아래 부분에서 행렬 곱 연산을 수행한 후에 합 연산을 수행하는 형태로 바꿔줘야 한다.

res_att[idx] = (q_mat * k_mat).sum(dim=-1) * self.relation_pri[relation_type] / self.sqrt_dk

코드에 대한 자세한 리뷰는 추후에 업로드하도록 하겠다.

앞서 언급하였듯이 최종 단계에서의 softmax는 target node를 기준으로 이루어지기 때문에 현재와 같이 1번째 meta relation 만을 기준으로 연산을 수행한다면 각각의 head에 대해 softmax가 아닌 sigmoid 함수가 적용되게 될 것이다. 2번째 meta relation까지 한번에 계산했다면 2개의 target node에 대해 softmax 함수가 적용되어 결과물로 길이 $h$ 의 벡터가 2개 주어질 것이다.

이를 그림으로 나타내면 아래와 같다.

Step2: Heterogenous Message Passing

이전 섹션에서 주어진 edge type 하에서 source node와 target node 사이의 유사성에 대해 계산하였다면, 본 섹션에서는 이제 source node로부터 수집한 정보를 target node로 전달할 차례이다. 이 때 일반적인 honogenous graph network에서는 이러한 정보가 동일한 파라미터를 통해 업데이트되었겠지만, Heterogenous Message Passing 메커니즘에서는 source node type과 edge type에 따라 다른 파라미터를 지정하여 진행하게 된다.

[Message(s, e, t) = \Vert_{i \in [1, h]} MSG {\text -} head^i(s, e, t)]

[MSG {\text -} head^i(s, e, t) = M {\text -} Linear^i_{\tau(s)} (H^{l-1}[s]) W_{\phi(e)}^{MSG}]

Mutual Attention 과정을 보았기 때문에 특별히 어려울 것은 없다. 다만 위 수식에서의 M은 Transformer의 V 부분을 의미하고 실제 저자의 코드에서는 V-Linear라고 표기되어 있음에 유의하자.

Step3: Target-specific Aggregation

이전 2개 과정을 통해 attention score와 message를 수집/계산하였다. 지금부터는 이를 target node에 맞춰 통합하는 과정으로 이어진다.

[<\tau(s_1), \phi(e_1), \tau(t)>, <\tau(s_2), \phi(e_2), \tau(t)>]

위 meta relation 들에 대하여 attention score와 message를 모두 얻었다면 이 둘을 곱하여 updated vector를 얻을 차례이다.

[\tilde{H}^l [t] = \oplus_{\forall s \in N(t)} ( {\text Attention}(s, e, t) \cdot {\text Message} (s, e, t) )]

이 과정을 거치면 다른 feature distribution을 갖는 source node의 이웃들이 target node $t$ 로 정보를 통합하게 될 것이다. 이제 target node $t$ 의 updated vector를 맞는 type에 따라 다시 한 번 linear layer를 통과시킨다. 그리고 이전 layer의 output을 직접적으로 더해주어 residual connection 또한 추가해준다.

[H^l[t] = A {\text -}Linear_{\tau(t)} ( \sigma (\tilde{H}^l [t]) + H^{l-1}[t] )]

이와 같은 과정을 $L$ 번 반복해주면 바로 그 결과물이 target node의 contextualized representaion이 되는 것이고, 이를 통해 node classification이나 link prediction과 같은 downstream task에 활용하면 된다.

정리를 좀 해보면, HGT는 분명 meta relation을 활용하여 각 type에 맞는 weight matrix를 따로 설정하고 이를 자연스럽게 통합하고 있다. 다만 이렇게 별개의 weight matrix를 모두 만들 경우 분명 model이 무거워지는 것은 사실이다. 따라서 실제 적용 시에는 이러한 부분에 대해 유의해야 할 것이다.

3.2. Additional setting

앞서 효과적인 학습을 위해 3가지의 추가적인 장치가 구현되어 있다고 언급한 바 있다. 이제 이 부분에 대해 살펴볼 것이다.

1번째 장치: Relative Temporal Encoding

RTE는 graph dynamic을 다루기 위해 도입된 개념이다. 시간적 정보를 활용하기 위해 시간대 별로 분리된 graph를 구성하는 이전의 방법은 여러 time slot간의 구조적인 연결성을 잃어버리기 때문에 효과적인 방법으로 보기 어렵다. dynamic graph를 모델링하는 것의 핵심은 바로 모든 node/edge가 다른 timestamp에서도 서로 상호작용할 수 있게 만들어주는 것이다.

이러한 철학은 Transformer의 positional encoding을 변형하여 구현된다.

source node $s$ 의 timestamp는 $T(s)$ 이고 target node $t$ 의 timestamp는 $T(t)$ 이다. 이 둘 사이의 relative time gap은 $\Delta T(t, s) = T(t) - T(s)$ 로 표현할 수 있고, relative temporal encoding을 $RTE(\Delta T(t, s))$ 라고 표기한다.

학습 데이터셋이 모든 time gap을 커버하는 것은 아니기 때문에 RTE는 본 적 없는 시간에 대해서도 일반화할 수 있어야 한다. 이를 위해 논문에서는 sinusoid 함수를 basis로 놓고 학습 가능한 projection layer를 하나 더 둔다.

[Base(\Delta T(t, s), 2i) = sin (\Delta T(t, s) / 10000^{\frac{2i}{d}})]

[Base(\Delta T(t, s), 2i+1) = cos (\Delta T(t, s) / 10000^{\frac{2i+1}{d}})]

[RTE(\Delta T(t, s)) = T{\text -}Linear (Base(\Delta T(t, s)))]

최종적으로 target node $t$ 에 대한 temporal encoding은 source node $s$ 의 representation에 더해진다.

[\hat{H}^{l-1} [s] = H^{l-1}[s] + RTE(\Delta T(t, s))]

이 과정을 그림으로 나타내면 아래와 같다.

코드로 구현하면 아래와 같다.

class RelTemporalEncoding(nn.Module):
    # Implement the Temporal Encoding (Sinusoid) function.
    def __init__(self, n_hid, max_len=240):
        super(RelTemporalEncoding, self).__init__()
        position = torch.arange(0., max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, n_hid, 2) * -(math.log(10000.0) / n_hid))

        emb = nn.Embedding(max_len, n_hid)
        emb.weight.data[:, 0::2] = torch.sin(position * div_term) / math.sqrt(n_hid)
        emb.weight.data[:, 1::2] = torch.cos(position * div_term) / math.sqrt(n_hid)
        emb.requires_grad = False

        self.emb = emb
        self.lin = nn.Linear(n_hid, n_hid)
    def forward(self, x, t):
        return x + self.lin(self.emb(t))

edge_time이 주어지고, meta relation이 주어졌을 때 temporal encoding이 source node $s$ 의 representation에 더해지는 과정은 아래와 같이 구현된다.

rte = RelTemporalEncoding(n_hid=10)

source_node_vec = rte(source_node_vec, edge_time[idx])

2번째 장치: HGSampling

지금부터 설명할 2개의 방법론은 모두 scalibility를 향상시키기 위해 도입된 장치들이다.

작은 graph가 아니라면 full-batch GNN 학습은 현실적으로 힘든 경우가 많다. 그리고 속도와 효율을 중시하는 실제 서비스에 적용하기에는 부담스러운 측면도 있다. 이를 위해 sampling 방법이 많이 도입 되었는데, 이를 node-level로 추출할 수도 있고, grpah-level로 추출할 수도 있다. 이와 관련된 연구는 매우 많지만 node-level sampling을 적용한 알고리즘으로는 GraphSAGE, PinSAGE가 있고, IGMC, GraphSAINT는 graph-level sampling을 적용하였다.

그런데 앞서 소개한 방법들을 그대로 heterogenous graph에 적용할 경우, 만약 node type별 분포가 크게 다를 경우 상당히 불균형적이고 왜곡된 subgraph가 추출될 수 있다. 따라서 이를 위해 HGSampling이라는 방법이 제안된다.

HGSampling은 각 node/edge type에 속하는 수를 유사하게 맞춰주면서 정보 손실을 최소화하고 sample variance를 줄이기 위해 추출된 subgraph를 dense하게 유지할 수 있는 알고리즘이다.

아래 도식에서 HGSampling의 수행 과정을 살펴볼 수 있다. 기본적으로 각 node type $\tau{r}$ 에 대해 budget 딕셔너리 $B[\tau{r}]$ 를 만들어준다. 그리고 중요도 기반의 sampling을 사용하여 node type 별로 같은 수의 node를 추출해준다.

node $t$ 가 이미 추출되었다고 할 때, 이 node의 직접적인 이웃들을 모두 상응하는 budget에 추가해준다. 추가하는 방식은 아래와 같다.

그리고 이 이웃들에게는 node $t$ 의 normalized degree를 더해준다. 이 값은 추후에 sampling 확률을 계산하기 위해 사용된다. 이러한 normalization은 high-degree node에 의해 크게 좌지우지 되는 sampling 방식을 피하면서도, 이웃들에 대한 각 sampled node의 random walk 확률을 모으는 것과 같은 효과를 지닌다.

budget이 업데이트된 이후, 알고리즘1의 line9에서 각 budget 속에 있는 각 node $s$ 의 cumulative normalized degree를 계산한다. 이러한 sampling probability는 sampling variance를 감소시키는 효과를 지닌다.

이후 계산된 확률에 기반하여 type $\tau$ 의 $n$ 개의 node를 추출하고 이를 output node set에 추가하고, 이웃목록을 budget에 업데이트한다. 추출된 node는 budget에서 제거된다.

최종적으로 우리는 이렇게 sampled된 node에 기반하여 adjacency matrix를 다시 생성한다.

위와 같은 과정으로 생성한 subgraph는 node type별로 유사한 수의 node를 갖게 되고, sampling variance를 감소시킬 수 있을만큼 충분히 dense하다. 따라서 이를 활용하여 web-scale heterogenous graph를 효과적으로 학습할 수 있다.

그림으로 표현하면 아래와 같다.

3번째 장치: Inductive Timestamp Assignmant

지금까지 우리는 각 node $t$가 timestamp $T(t)$ 에 assign되었다고 가정했는데 실 세계의 heterogenous graph에서 많은 node들은 고정된 시간으로 묶여있지 않다. 따라서 그러한 node에게는 다른 timestamp 들을 할당해주어야 하는데, 이러한 node를 plain nodes라고 한다.

또한 명시적인 timestamp를 갖고 있는 node들이 있는데 이들을 event nodes라고 한다. 본 논문에서는 event node의 timestamp에 기반하여 plain node의 timestamp를 assign하는 inductive timestamp assignment 알고리즘을 제안한다. plain node는 event node로 부터 timestamp를 상속받게 된다.


4. Evaluation and Conclusion

Comment  Read more

SQL 기본

|

이번 포스팅에서는 SQL 기본 구문에 대해 정리해본다. 평소에 자주 사용하지만 정확히 숙지하지 못한 부분들을 정리하는 데에 목적이 있으며 아주 기초적인 부분은 생략한다.


SQL 기본

1. Basic

SELECT문의 기본적인 쓰임은 아래와 같다. 순서를 지켜야 한다.

/* 블록 주석 */
--한줄주석
SELECT select_expr
FROM table_reference
WHERE where_condition
GROUP BY column_name/expression/position
HAVING where_condition
ORDER BY column_name/expression/position

WHERE 절 뒤에는 조건/관계 연산자가 오게 된다. 조건 연산자에는 =, <, >, <=, >=, != 가 있고, 관계 연산자에는 or, and, not, between A and B, in, not in 가 있다.

ORDER BY는 정렬을 위해 사용되며, 큰 테이블에 직접적으로 사용하는 것은 많은 부하를 유발하게 된다.

--ORDER BY height ASC
--ORDER BY height DESC
SELECT name
FROM table
ORDER BY height asc, weight desc

위와 같이 2개 이상의 칼럼으로 정렬할 때는 왼쪽부터 우선순위를 갖는다.

LIMIT 5 OFFSET 3

위 구문은 3번째 row부터 5개를 반환한다는 뜻을 갖는다.

GROUP BY는 기준을 세워 집계하는 용도로 사용한다. 집계 함수에는 count, sum, avg, min, max 등이 있다. 예시는 아래와 같다.

SELECT user_id, sum(amount*price)
FROM table
GROUP BY user_id
HAVING sum(amount * price) > 1500;

GROUP BY에서 where 조건 절을 추가하고 싶다면 위와 같이 HAVING을 이용하면 된다.

NULL을 처리하는 방법을 정리한다. IS NULL은 null 여부를 조건식으로 나타낸다.

--IS (NOT) NULL
SELECT * FROM table WHERE name IS NULL
-- SELECT * FROM table WHERE name IS NOT NULL

IF NULL의 경우 특정 칼럼이 null값이 아니면 그 값 그대로, null이면 2번째 인자 값으로 채워주는 기능을 갖는다.

--IFNULL (혹은 NVL, ISNULL)
SELECT animal_type, IFNULL(NAME, no name) as name
FROM table
--→ null이면 ‘no_name’ 아니면 NAME 그대로

COALESCE는 정의된 열 중 null이 아닌 1번째 값을 출력한다.

SELECT animal_type, COALESCE(NAME, TAG, nothing) as KEY
FROM table
--→ NAME에 값이 있으면 NAME의 값을, NAME에 값이 없지만 TAG에 값이 있으면 TAG의 값을 출력함

IF는 말 그대로 조건식을 나타내는데, True일 경우 2번째 인자 값을 False일 경우 3번째 인자 값을 반환한다.

SELECT name, IF(price > 100, expensive, normal) as judgement
FROM table

LIKE의 몇 가지 사용법은 아래와 같다.

WHERE user_id like a%
  • ‘a%’: a로 시작하는 모든 것
  • ‘_a%’: 2번재 자리에 a가 들어가는 모든 것
  • ‘a_%_%’: a로 시작하고, 길이가 최소 3이상인 것

서브쿼리는 쿼리 속의 쿼리이다. 서브쿼리의 결과물은 여러 값일 경우 이를 조건으로 사용하기 힘들 때가 있다. 이 때 ANY, ALL, SOME을 이용할 수 있다.

  • ANY(SOME): 서브쿼리의 결과 중 하나라도 해당되면 OK (or의 역할)
  • ALL: 서브쿼리의 결과 모두에 해당해야 함 (and의 역할)

2. Case

조건을 쓰고 이를 만족했을 때와 만족하지 않았을 때의 반환 값을 지정하여 하나의 칼럼으로 생성하는 구문이다. ELSE를 생략하면 Null 값이 반환될 수 있다.

SELECT user_id,
    CASE
        WHEN (age BETWEEN 0 and 40) THEN 'YOUNG'
        WHEN (age BETWEEN 41 and 70) THEN
            CASE
                WHEN (mind = 'good') THEN 'not young but good mind'
                ELSE 'not young and bad mind'
            END
        ELSE '?'
    END AS condition
FROM table

위 예시와 같이 기본적으로 CASE ~ END라는 큰 틀 안에 조건식을 채워넣는 개념이며, 조건식은 WHEN ~ THEN ~ + ELSE로 짜여지게 된다.


3. With

WITH는 특정 질의 결과에 이름을 부여하여 재사용을 가능하게 함과 동시에 가독성을 높여준다. 또한 WITH 구문을 사용할 경우 반복적으로 실행 계획을 수립하는 것이 아니기 대문에 쿼리의 성능 또한 향상된다.

WITH 구문은 여러 번 연속해서 쓸 수도 있다. 아래 예시를 참고하자.

WITH ex1 AS (
    query
),
ex2 AS (
    query
)

반복적으로 쿼리를 수행하는 WITH RECURSIVE 구문의 좋은 예시는 아래와 같다.

WITH RECURSIVE TIMETABLE AS (
    SELECT 0 AS HOUR FROM DUAL
    UNION ALL
    SELECT HOUR + 1 FROM TIMETABLE
    WHERE HOUR < 23
)

SELECT T.HOUR, IFNULL(COUNT, 0) as COUNT
FROM TIMETABLE as T
    LEFT JOIN (
        SELECT HOUR(datetime) as HOUR, COUNT(animal_id) as COUNT
        FROM animal_outs
        GROUP BY HOUR
        HAVING HOUR BETWEEN 0 and 23
        ORDER BY HOUR
    ) AS animal
    ON T.HOUR = animal.HOUR

4. Join

INNER JOIN의 틀은 다음과 같다.

SELECT col
FROM table1
  INNER JOIN table2
  ON table1.key = table2.key
WHERE condition
GROUP BY
HAVING
ORDER BY

OUTER JOIN의 틀은 다음과 같다.

SELECT col
FROM table1
  <LEFT/RIGHT/FULL> JOIN table2
  ON table1.key = table2.key
WHERE condition
GROUP BY
HAVING
ORDER BY

5. 문자열 함수

SUBSTRING은 문자열을 시작위치로부터 자르는 함수이다.

SELECT SUBSTRING('1234', 2);
-- 12 

SELECT SUBSTRING('abc', -2);
-- cb

SELECT SUBSTRING('12345', 2, 2);
-- 23

SUBSTRING_INDEX는 문자열을 자른 후 주어진 INDEX까지의 문자를 추출한다.

SELECT SUBSTRING_INDEX('이름,나이,시간', ',', 2)
-- 이름, 나이

위의 두 함수를 조합하면 split & get element를 구현할 수 있다.

SELECT SUBSTRING_INDEX(SUBSTRING_INDEX('이름,나이,시간', ',', 2), ',', -1)
-- 나이

CHAR_LENGTH는 string의 길이를 문자 단위로 반환한다. 이 함수의 경우 bytes도 문자 수로 반환하는데, LENGTH 함수는 바이트는 바이트 수로 반환하는 차이점이 있다.

CONCAT은 여러 값을 하나의 결과로 연결한다.

SELECT CONCAT('Summer', ' ', 1923) as release_date
-- Summer 1923

CONTAINS_SUBSTR은 대소문자를 구분하지 않는 정규화된 검색을 수행하여 값이 표현식에 있는지 확인한다. 값이 있으면 True, 없으면 False를 반환한다.

SELECT CONTAINS_SUBSTR('the blue house', 'Blue house') AS result
-- True

SELECT * FROM Recipes WHERE CONTAINS_SUBSTR(Recipes, 'toast')
+-------------------+-------------------------+------------------+
| Breakfast         | Lunch                   | Dinner           |
+-------------------+-------------------------+------------------+
| Potato pancakes   | Toasted cheese sandwich | Beef stroganoff  |
| Avocado toast     | Tomato soup             | Blueberry samon  |
+-------------------+-------------------------+------------------+

2번째 값이 1번째 값의 suffix이면 ENDS_WITH 함수는 True를 반환한다. STARTS_WITH는 당연히 반대로 생각하면 된다.

WITH items AS
  (SELECT 'apple' as item
  UNION ALL
  SELECT 'banana' as item
  UNION ALL
  SELECT 'orange' as item)

SELECT
  ENDS_WITH(item, 'e') as example
FROM items;

+---------+
| example |
+---------+
|    True |
|   False |
|    True |
+---------+

LEFT, RIGHT 함수는 아래와 같은 쓰임새를 갖는다.

WITH examples AS
(SELECT 'apple' as example
UNION ALL
SELECT 'banana' as example
)
SELECT example, LEFT(example, 3) AS left_example
FROM examples;

+---------+--------------+
| example | left_example |
+---------+--------------+
| apple   | app          |
| banana  | ban          |
+---------+--------------+

LTRIM, RTRIM, TRIM은 공백을 제거할 때 사용된다.

REPLACE 함수는 문자열 값을 바꾸는 함수이다.

WITH desserts AS
  (SELECT 'apple pie' as dessert
  UNION ALL
  SELECT 'cherry pie' as dessert)

SELECT REPLACE(dessert, 'pie', 'cobbler') as example
FROM desserts;

+--------------------+
| example            |
+--------------------+
| apple cobbler      |
| cherry cobbler     |
+--------------------+

TRANSLATE 함수는 REPLACE와 비슷하지만 차이가 있다. REPLACE 함수는 위 문자열을 하나로 취급한다. 즉, 위 예시에 나온 pie라는 문자열이 정확히 존재해야만 변환이 일어난다. 그러나 TRANSLATE 함수는 각 문자 하나 하나에 대한 변환을 수행한다.

SELECT TRANSLATE('abcde', 'ce', 'XY') FROM DUAL
-- abXdY

REPEAT은 지정한 수 만큼 문자열을 반복 반환한다.

SELECT REPEAT('abc', 3) as REPEAT
-- abcabcabc

REVERSE는 문자열을 역으로 반환한다.

SPLIT 함수는 delimiter 인수를 사용하여 value를 분할하여 array로 반환한다.

WITH letters AS
  (SELECT 'b c d' as letter_group)

SELECT SPLIT(letter_group, ' ') as example
FROM letters;

+----------------------+
| example              |
+----------------------+
| [b, c, d]            |
+----------------------+

6. 집합 연산자

Union은 복수의 쿼리 결과를 합친다. 중복을 제거하기 때문에 만약 중복을 그대로 유지하고 싶다면 Union All을 사용하면 된다.

SELECT *
FROM jan
UNION ALL
SELECT *
FROM mar

INTERSECTMINUS는 각각 교집합, 차집합 결과를 반환한다.


7. 날짜 함수

DATE_FORMAT 함수는 DATE_FORMAT(DATETIME, ‘%Y-%m-%d’)와 같이 사용하며, datetime 칼럼을 원하는 형식에 맞게 변환할 수 있다.

DATE_DIFF 함수는 두 datetime 사이의 차이를 구하는 함수인데, 차이의 기준을 지정할 수 있다.

-- end_date - start_date 구조임
SELECT DATE_DIFF(HOUR, start_date, end_date) FROM DUAL

YEAR, MONTH, DAY, WEEK, HOUR, MINUTE 등을 사용할 수 있다.

SYSDATE 함수는 현재 일자와 시간을 date 형식으로 반환하는 함수이다. 이를 원하는 형식으로 바꾸기 위해서 다음과 같은 작업을 수행할 수 있다.

SELECT TO_CHAR(SYSDATE, 'YYYYMMDD') FROM DUAL

8. 정규 표현식 함수

REGEXP_CONTAINS(value, regexp)는 value가 정규표현식 regexp와 부분적으로 일치하면 True를 반환한다.

REGEXP_EXTRACT(value, regexp, position, occurrence)REGEXP_SUBSTR와 같은 함수로, value에서 정규표현식 regexp와 일치하는 첫 번째 하위 문자열을 반환하고, 일치하는 항목이 없으면 Null을 반환한다. position은 optional 인자로, position이 지정되면 검색은 value의 이 위치에서 시작한다. position은 양의 정수여야 한다.

REGEXP_EXTRACT_ALL은 위 함수와 같은 효과이나 일치하는 value의 모든 하위 문자열 배열을 반환한다는 차이가 있다.

WITH email_addresses AS
  (SELECT 'foo@example.com' as email
  UNION ALL
  SELECT 'bar@example.org' as email
  UNION ALL
  SELECT 'baz@example.net' as email)

SELECT
  REGEXP_EXTRACT(email, r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.([a-zA-Z0-9-.]+$)')
  AS top_level_domain
FROM email_addresses;

+------------------+
| top_level_domain |
+------------------+
| com              |
| org              |
| net              |
+------------------+

REGEXP_REPLACE(value, regexp, replacement)는 regexp와 일치하는 모든 value 하위 문자열이 replacement로 바뀐 string을 반환한다.

WITH markdown AS
  (SELECT '# Heading' as heading
  UNION ALL
  SELECT '# Another heading' as heading)

SELECT
  REGEXP_REPLACE(heading, r'^# ([a-zA-Z0-9\s]+$)', '<h1>\\1</h1>')
  AS html
FROM markdown;

+--------------------------+
| html                     |
+--------------------------+
| <h1>Heading</h1>         |
| <h1>Another heading</h1> |
+--------------------------+

References

1) 프로그래머스 SQL 고득점 Kit
2) 참고블로그1
3) 참고블로그2
4) BigQuery문서

Comment  Read more

AB Test Sample Size 구하기

|

이번 포스팅에서는 AB Test를 진행할 때 거의 필수적으로 고려해야 하는 Sample Size를 구하는 과정에 대해 정리해본다.


AB Test Sample Size 구하기

1. Basic

귀무가설과 대립가설이 아래와 같다고 해보자.

[H_0: \mu = \mu_0]

[H_a: \mu > \mu_0]

실험의 특성을 고려하여 유의수준과 검정력을 설정하면 이를 통해 최소 Sample Size를 추정할 수 있다. 1 - 검정력인 $\beta$ 는 $\mu = \mu_a$, \mu_a > \mu_0$ 과 같이 $\mu$ 를 특정 값으로 지정하였을 때 구할 수 있다.

Reject Region(기각역)이 아래와 같이 정의된다고 하자. $\hat{\theta}$ 는 물론 추정량을 의미한다.

[RR = { \hat{\theta}: \hat{\theta} > k }]

$k$ 는 기각역이 시작되는 point이다.

2종 오류가 발생할 확률인 $\beta$ 는 대립가설이 참일 때 귀무가설을 적절히 기각하지 않을 확률을 의미한다. 이를 위 수식과 이어서 설명하면, 아래와 같이 표현할 수 있다.

[\beta = P(\hat{\theta} \leq k, \theta = \theta_a)]

위 사실들을 고려하여 다시 한 번 $\alpha$ 와 $\beta$ 를 구해보자.

[\alpha = P(\bar{Y} > k, \mu = \mu_0)]

[= P( \frac{\bar{Y} - \mu_0}{\sigma / \sqrt{n}} > \frac{k - \mu_0}{\sigma / \sqrt{n}}, \mu = \mu_0 )]

[= P(Z > \mathcal{z}_{\alpha})]

[\beta = P(\bar{Y} \leq k, \mu = \mu_a)]

[= P( \frac{\bar{Y} - \mu_a}{\sigma / \sqrt{n}} \leq \frac{k - \mu_a}{\sigma / \sqrt{n}}, \mu = \mu_a )]

[= P(Z \leq - \mathcal{z}_{\beta})]

위 2개 식에서 우리는 아래 사실들을 정리할 수 있다.

[\frac{k - \mu_0}{\sigma / \sqrt{n}} = \mathcal{z}_{\alpha}]

[\frac{k - \mu_a}{\sigma / \sqrt{n}} = -\mathcal{z}_{\beta}]

위 식을 $k$ 에 대해 정리하면 다음과 같다.

[k = \mu_0 + \mathcal{z}{\alpha} (\frac{\sigma}{\sqrt{n}}) = \mu_a - \mathcal{z}{\beta} (\frac{\sigma}{\sqrt{n}})]

따라서 위 식에서 $k$ 를 제외하고 $n$ 에 대해 다시 정리하면, Sample Size for an Upper-tail $\alpha$ - level Test를 얻을 수 있다.

[n = \frac{(\mathcal{z}{\alpha} + \mathcal{z}{\beta})^2 \sigma^2}{(\mu_a - \mu_0)^2}]

이 때 분모에서 제곱항 내부에 위치한 값을 Effect Size라고 표현하며, 실험에서 확인하고 싶은 유의미한 차이를 의미한다.

[\delta = \mu_a - \mu_0]

그리고 위 공식은 $n$ 이 충분히 클 때 성립한다.

2. Proportion Test

애플리케이션 상에서 AB Test를 할 때에는 비율의 차이를 metric으로 두는 경우가 많다. 예를 들어 새로운 모델을 배포하였을 때 Conversion Rate이 충분히 상승하였는지 알고 싶을 수 있다. 이 경우 귀무가설과 대립가설은 아래와 같다.

[H_0: p = p_0]

[H_a: p > p_0]

이 경우도 결국 똑같이 구할 수 있다. 신청 여부에 따른 분포이기 때문에 본 Test에서는 이항 분포가 사용된다. 그렇다면 분산은 아래와 같이 구할 수 있다.

[\sigma_0^2 = p_0 (1-p_0)]

새로운 모델에서의 목표 Conversion Rate을 $p_a$ 라고 하면 Effect Size는 $p_a - p_0$ 가 될 것이다. 그리고 표본 평균의 분산은 아래와 같이 구할 수 있다.

[\sigma^2 = p_0(1-p_0) + p_a(1-p_a)]

위 사실을 모두 종합하여 Sample Size를 구해보면 다음과 같다.

[n = \frac{(\mathcal{z}{\alpha} + \mathcal{z}{\beta})^2 (p_0(1-p_0) + p_a(1-p_a)) }{(p_a - p_0)^2}]

만약 아래와 같은 실험 설정이라면,

[\alpha = 0.05, 1 - \beta = 0.8, p_0 = 0.02, p_a = 0.022]

필요한 최소 Sample Size는 63215이다.


References

1) Mathematical Statistics with Applications(Denis D. Wackerly, …) Ch 10.4
2) Comparing Two Proportions – Sample Size

Comment  Read more

AB Test 기본

|

이번 포스팅에서는 AB Test에 대해 정리해본다. 사실 AB Test를 완벽히 설명하기 위해서는 정말 많은 자료와 깊은 고민의 경험이 필요하다. 본 글은 단지 필자가 생각을 정리하기 위해 작성한 글임을 밝혀둔다.


AB Test 기본

1. Background

AB Test는 간단히 말해서 User를 무작위로 2개의 그룹으로 구분한 뒤, 서로 다른 Action을 수행했을 때의 반응의 차이를 파악하는 분석 기법이다. Bayesian AB Test를 사용할 수도 있지만, 일반적으로는 Frequentist AB Test 관점이 더욱 자주 사용되는 것으로 보인다.

Null Hypothesis를 $H_0$, Alternative Hypothesis를 $H_1$ 이라고 한다면, 일반적으로 우리의 목표는 Null Hypothesis(귀무가설)를 기각하는 것이 된다. 만약 기각하지 못할 경우 우리는 귀무가설을 기각할 충분한 근거가 없다고 판단내려야 한다.

1종 오류는 귀무가설이 True임에도 우리가 실험을 통해 이를 기각하는 오류를 의미하며 $\alpha$ 라고 표기한다. 2종 오류는 반대로 귀무가설이 False임에도 우리가 이를 기각하지 않는 오류를 의미하며 $\beta$ 라고 표기한다. 그리고 귀무가설이 False인데 우리가 이를 잘 기각할 확률을 의미하는 검정력은 $1-\beta$ 로 계산한다.

이러한 가설을 기각할 것인가를 판단하는 기준으로 우리는 P-value를 사용하게 된다. P-value는 간단히 말해서 귀무가설 $H_0$ 이 True라고 가정하고 얻은 확률 분포에서 현재의 실험 결과가 나올 확률을 의미한다. 만약 이렇게 얻은 P-value가 0.01이라고 하면, 귀무가설이 True일 때 현재의 결과가 나올 확률이 1%란 이야기이므로, 우리가 일반적으로 사용하는 유의수준 0.05보다 한참 낮은 값이다. 따라서 보통 이 경우 귀무가설을 기각하고, 통계적으로 유의미한 차이가 있다고 결론 내리게 된다.


2. Netflix: What is an A/B Test?

이번 장에서는 AB Test에 대해 쉽게 풀어쓴 Netflix의 게시물을 소개한다. 원문 링크는 글 하단에 있는 참고 문헌 목록에서 확인할 수 있다.

위와 같이 2개의 선택지를 놓고 어떤 방식이 더 우수한 결과를 이끌어내는지 확인해 볼 수 있다. 좌측에 위치한 선택지가 기존 방식이라고 하면 이 방식의 UI를 경험할 고객 군을 Control Group이라고 지칭한다. 반면 새로운 선택지, 즉 우측에 위치한 방식의 UI를 경험할 고객 군은 Treatment Group이라고 한다.

위 사례 처럼 UI 관련 실험에서는 여러 변인들에 대해서도 살펴보아야 한다. 예를 들어 앱을 로딩하는데 시간이 더 걸린다거나, 네트워크 상태에 따라 영상의 질이 변하는 것은 아닌지 등을 반드시 체크해야 한다.

실험의 성과를 결정할 지표(metric)이 결정되었다면, 이 지표가 무엇을 의미하는지 명확히 파악해야 한다. 예를 들어 CTR을 metric으로 정했다고 하자. 단순히 CTR이 높다고 좋은 일일까? 어떤 고객들은 뒤집힌 영화의 제목을 보면서 제대로 보기 위해 컨텐츠를 클릭했을 수도 있다. 단순히 CTR이 높다고 좋은 실험이었다고 결론을 내려버리면 서비스를 제대로 이해하지 못한 것이라고 할 수 있다.

Netflix에서는 고객에게 주는 만족감을 더욱 잘 포착할 수 있는 일반적인 지표를 본다. 이를 member engagement with Netflix라고 표현할 수 있는데, 결국 이는 고객들이 Netflix를 이용하는 것을 취미로써 계속 선택하고 있는지에 관한 문제이다.

Holding everything else constant
무작위 추출로 2개의 그룹을 추출하였다 하더라도, 성과에 큰 영향을 줄 수 있는 여러 변인에 대한 점검은 필수적이다. Netflix에서는 멤버십의 길이, 컨텐츠의 선호도, 언어 등과 같은 요소들을 예시로 들고 있다. 오직 테스트하고자 하는 그 변수 하나만 달라야 한다.

It all starts with and idea
Netflix에서는 AB Test는 하나의 아이디어에서 출발한다고 강조하고 있다. 그것이 UI에 대한 변화일 수도 있고, 개인화 시스템이 될 수도 있고 가입 절차의 변화가 될 수도 있다. 이제는 익숙한 Top10 컨텐츠도 처음에는 테스트 가설에서 시작했다고 한다. 이 아이디어에서 얻고자 했던 것은 2가지이다. popular 컨텐츠를 소개함으로써 보는 이로 하여금 공유된 경험을 느끼게 만들고자 했던 것이 첫 번째 목표이고, 다른 사람들이 자주 보는 컨텐츠를 같이 향유함으로써 같은 주제로 다른 사람들과 대화하고 싶어하는 인간의 내재된 욕망을 이끌어 내는 것이 두 번째 목표였다고 한다.

특히 2번째 목표가 인상적인데, 주위를 슬쩍 둘러봐도 Netflix에서 히트작이 나오면 그 작품에 대해 이야기를 주고 받는 사람들을 자주 목격할 수 있다.

그렇다면 이 가설이 맞다는 것을 어떻게 증명했을까? 메인 지표로는 멤버십 구독이 설정되었는데 왜냐하면 이것이 결국 engagement with Netflix과 큰 상관 관계가 있기 때문이다. 그 외에도 Top10 목록에 나타난 제목 및 다른 부분에 대한 viewing과 같은 보조 지표 역시 검증의 기준으로 활용되었다.

마지막으로 모든 아이디어가 성공으로 이어지는 것이 아니기 때문에 일종의 guardrail이 설정되었다. 실험 역시 중요하지만 이것이 고객 경험의 질의 악화로 이어져서는 안된다. 따라서 실험 때문에 Treatment Group에 있는 고객들이 비정상적으로 서비스 센터로 연락을 더 많이 한다거나 하는 현상이 발생하면 이를 확인하여 조치하도록 하였다.

본 글을 통해 Netflix의 AB Test 철학에 대해 알 수 있었다.


3. 생각할 수 있는 문제점

다시 기본 주제로 돌아온다. 1장에서 설명한 수준으로 AB Test를 진행하고 의사 결정을 내린다면 운이 따르지 않을 경우 좋지 않은 결과를 떠안아야 할 수도 있다. 본 Chapter에서는 AB Test에서 발생할 수 있는 여러가지 문제점에 대해 짚어보고 해결책에 대해 논의한다. Reference에 링크를 첨부한 여러 블로그와 Medium에 있는 Tech Company의 포스팅 및 몇 가지 교재를 참조하였음을 밝힌다.

일단 첫 번째로 생각할 수 있는 부분은 유의 수준에 관한 것이다. 0.05라는 수치는 적절한 것일까? 이 부분에 대해서는 꽤 많은 챌린지가 발생했으며 일부 통계학자들은 0.05 대신 최소한 0.005 수준의 P-value를 고려하는 것이 적절하다고 이야기하고 있다.

그 다음으로는 실험 기간과 표본 수에 대해 생각해볼 수 있다. 실험 결과에서 같은 수준의 차이를 보였다 하더라도, 더 오랫동안 실험하거나, 더 많은 수의 표본을 모으면 표본 오차가 작아지기 때문에 더 작은 P-value를 얻을 가능성이 높아진다. 실제 서비스에서 AB Test를 실행한다고 하면 이 또한 비용이 들어가는 것이므로 실험 기간에 대해서는 신중해질 수 밖에 없다. 따라서 무작정 더 오래 실험해야 합니다! 라고 이야기할 수는 없는 것이다.

그래서 반드시 고려되어야 할 부분이 바로 Effect Size이다. P-value는 표본의 크기가 커질 때도 작아지지만 Effect Size가 커질 때도 작아진다. Effect Size의 수치 자체도 중요하지만, 이 수치가 실제 서비스에서 어느 정도의 의미를 갖는지를 명확히 판단하는 것이 더욱 중요하다.

  • 0.1%의 CTR 향상은 성공이라고 말할 수 있는가?
  • 이번에 새롭게 모델을 배포했다면 어느 정도의 향상을 기대해야 성공이라고 이야기할 수 있는가?

위와 같은 형식의 질문에 답해보고, 이를 바탕으로 실험 계획을 수립하고 이후의 분석 및 action item 선정 작업을 진행해야 할 것이다.


4. 적절한 실험 규모

목표하는 Effet Size, 1종 오류 및 2종 오류의 수준 등을 모두 고려하면 적절한 실험 규모를 산정할 수 있다. 2가지 오류 모두 고려하는 것이 마냥 쉽지만은 않다. 효과/차이가 없다는 것을 판별하는 것도 중요하고 유의미한 효과가 존재할 때 이를 잘 뒷받침하는 것 또한 놓칠 수 없는 것이 AB Test의 본질이다.

본 내용에 대해서는 더욱 자세한 설명이 필요하므로 추후 다른 글에서 정리해보도록 하겠다.


References

1) 블로그: AB Test에서 p-value에 휘둘리지 않기
2) Netflix: What is an A/B Test?

Comment  Read more

Apache Spark 기본

|

이번 포스팅에서는 Apache Spark의 기본에 대해 정리해본다. 평소에 자주 사용하지만 정확히 숙지하지 못한 부분들을 정리하는 데에 목적이 있다.


Apache Spark 기본

1. 기본 개념

Spark는 여러 컴퓨터를 모은 클러스터를 관리하는 framework다. 이 때 클러스터는 Hadoop Yarn, Mesos 같은 클러스터 매니저가 관리하게 되며, Spark는 클러스터의 데이터 처리 작업을 관리 및 조율하게 된다.

사용자는 이 클러스터 매니저에 애플리케이션을 제출하고, 클러스터 매니저는 애플리케이션 실행에 필요한 자원을 할당하게 된다.

Spark 애플리케이션Driver 프로세스와 다수의 Executor 프로세스로 구성된다. 이 때 Driver 프로세스는 메인 함수를 실행하고, 전반적인 Executor 프로세스의 작업과 관련된 분석, 배포 및 스케쥴링을 담당하게 된다. Executor 프로세스는 Driver 프로세스가 할당한 작업을 수행한 후 다시 보고한다.

즉, 이름에서도 알 수 있듯이 Driver 프로세스는 계획을 수립하고 명령을 내리는 컨트롤 타워의 역할을 하고, Executor 프로세스는 이를 수행하는 일꾼의 역할을 한다는 것을 알 수 있다.

아래 그림은 클러스터 매니저가 물리적 머신을 관리하고 스파크 애플리케이션에 자원을 할당하는 방법을 나타낸다. 사용자는 각 Executor에 할당할 노드 수를 지정할 수 있다.

Spark는 파이썬, 자바, 스칼라, R을 지원한다. 예를 들어 파이썬을 사용한다고 하면 Spark는 사용자를 대신하여 파이썬으로 작성한 코드를 Executor의 JVM에서 실행할 수 있는 코드로 변환한다. 이 과정에도 역시 비용이 든다. 따라서 만약 사용자가 큰 비용을 수반하는 UDF를 생성해야 한다면 스칼라나 자바로 이를 구현하는 것이 좀 더 효율적이다.

SparkSession은 앞서 기술한 Driver 프로세스이다. 사용자가 정의한 처리 명령을 클러스터에서 실행하게 되는데, 하나의 SparkSession은 하나의 Spark 애플리케이션에 대응한다.

그리고 SparkSession의 SparkContext는 클러스터에 대한 연결을 나타낸다. 이 SparkContext를 통해 RDD 같은 저수준 API를 사용할 수 있다.

Spark에는 3가지의 구조적 API가 존재한다. DataFrame, Dataset, SparkSQL이 그 대상들이다. 다만 Dataset의 경우 파이썬이나 R에서는 지원되지 않는다. 이들 개념에 대해서는 다음 장에서 살펴본다. Spark DataFrame이나 Dataset은 수 많은 컴퓨터에 분산 저장된다.

파티션은 모든 Executor가 병렬로 작업을 수행할 수 있도록 chunk로 데이터를 분할한 것이다. 파티션은 또한 데이터가 클러스터에서 물리적으로 분산되는 방식을 나타내고, 만약 파티션이 1개면 Executor가 많아도 병렬성은 1이 된다.

기본적으로 DataFrame과 같은 Spark의 핵심 구조는 immutable하고, 변경하고 싶다면 명령을 내려줘야 한다. 이 명령을 Transformation이고, 이 개념은 Spark에서 비즈니스 로직을 표현하는 핵심 개념이다.

Transformation에는 다음과 같이 2종류가 존재하고, Wide Dependency를 가지는 Transformation은 Shuffle 비용을 발생시키기 때문에 주의가 필요하다.

이렇게 만든 Transformation은 바로 실행되는 것은 아니다. 왜냐하면 Spark는 Lazy Execution이라는 규칙을 갖고 있기 때문이다. Spark는 특정 연산 명령이 내려지면 데이터를 즉시 수정하는 것이 아니라 실행 계획을 생성하고, 마지막 순간 전에 원형 Transformation을 간결한 물리적 실행 계획으로 컴파일한다. 그리고 이 과정 속에서 Spark는 사용자가 쉽게 할 수 없는 최적화를 수행하여 작업의 효율성을 높인다.

Action은 실제 연산을 지시하는 명령이다. 예를 들어 count 명령은 DataFrame의 전체 레코드 수를 반환하는 Action이다. 이렇게 Action을 지정하면 Spark Job이 시작된다. 참고로 Spark 애플리케이션은 1개 이상의 Job으로 구성된다.


2. 구조적 API

구조적 API는 데이터 흐름을 정의하는 기본 추상화 개념이고 앞서 언급하였듯이 DataFrame, Dataset, SparkSQL로 구성된다. 구조적 API는 비정형 로그파일부터 정형적인 Parquet 파일까지 다양한 유형의 데이터를 처리할 수 있다.

DataFrame과 Dataset의 공통적인 특성은 다음과 같다.

  • 잘 정의된 Row, Column을 갖는 분산 테이블 형태의 컬렉션
  • 결과를 생성하기 위해 어떤 데이터에 어떤 연산을 적용해야 하는지 정의하는 지연 연산의 실행 계획
  • 불변성을 지님
  • DataFrame에 액션을 호출하면 Spark는 Transformation을 실제로 실행하고 결과를 반환함
  • DataFrame을 사용하면 Spark의 최적화된 내부 포맷을 이용할 수 있음

마지막 부분이 중요한데, 사실 Spark는 사용자 모르게 (물론 살펴보면 알 수 있다.) 복잡한 연산 과정에 대해 수많은 최적화를 수행한다. 이 때 구조적 API인 DataFrame을 사용하면 Spark의 최적화 과정은 더욱 빛을 발하게 된다. 이는 결국 정확히 어떤 작업을 하는지 인지하고 있을 때만 저수준 API인 RDD를 호출해야 하며, 명확한 목적과 설계가 잡혀있지 않는 이상 웬만하면 DataFrame 수준에서 작업을 진행하는 것이 좋다는 뜻이다.

구조적 API의 실행 과정에 대해 살펴보자.

일단 구조적 API를 이용하여 코드를 작성한다. Spark는 이를 논리적 실행 계획으로 변환한다. 이후 논리적 실행 계획을 물리적 실행 계획으로 변환하며 이 과정에서 최적화를 할 수 있는지 확인한다. 최적화는 Catalyst Optimzer에 의해 이루어진다. Spark는 알아서 실행 과정 속에서 최적화를 해주는 것이다!

이후 Spark는 클러스터에서 물리적 실행 계획, 즉 RDD 처리를 실행한다. 물리적 실행 계획은 일련의 RDD와 Transformation으로 변환되는데, Spark는 구조적 API로 정의된 쿼리를 RDD Transformation으로 컴파일한다. 이 과정 때문에 Spark는 컴파일러로 불린다.

사용자는 DataFrame과 같은 구조적 API에 기반하여 코드를 짜게 될 것이다. 그렇게 하면 이 추상화된 개념을 놓고 Spark는 실제 실행 계획을 수립하면서 최적화를 하고 최종적으로 결과물을 반환하게 된다.


3. 저수준 API

Spark에서는 2가지 저수준 API를 지원한다. 하나는 앞선 장에서 언급한 RDD이다. 다른 하나는 Broadcast 변수와 Accumulator와 같은 분산형 공유 변수를 배포하고 다루기 위한 API이다.

3.1. RDD

RDD는 resilient distributed dataset의 약자로, 다수의 서버에 걸쳐 분산(distributed) 방식으로 저장된 데이터 요소들의 집합을 의미하며, 병렬 처리가 가능하고 장애가 발생해도 스스로 복구 가능(resilient)하다.

Dataset과 DataFrame이 존재하기 전에는 RDD 자체로 많이 사용하였지만 현재는 이 자체로는 특수한 목적 외에는 잘 사용되지는 않는다. 물론 사용자가 작성한 Dataset, DataFrame 코드는 앞서 기술하였듯이 실제로는 RDD로 컴파일되어 수행된다. RDD를 사용하면 Spark의 여러 최적화 기법을 사용할 수 없기 때문에 세부적인 물리적 제어가 필요할 때만 RDD를 명시적으로 사용해야 한다.

3.2. 분산형 공유 변수

브로드캐스트 변수는 불변의 값을 closure 함수의 변수로 캡슐화하지 않고 클러스터에서 효율적으로 공유하게 해준다. 모든 워커 노드에 큰 값을 저장하여 재전송 없이 많은 Spark 액션에서 재사용이 가능하다. 모든 Task마다 직렬화할 필요 없이 클러스터의 모든 머신에 캐시하게 된다.

브로드캐스트 변수는 다방면에서 활용될 수 있는데, 이후에 설명할 broadcast join에서 그 효과를 직관적으로 이해할 수 있을 것이다.

어큐멀레이터는 Spark 클러스터에서 row단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수를 제공한다. 이를 디버깅용이나 저수준 집계용으로 사용할 수 있다. 예를 들어 파티션 별로 특정 변수의 값을 추적하는 용도로 사용할 수 있고 병렬 처리 과정에서 더욱 효율적으로 사용할 수 있음. 기본적으로 Spark는 수치형 어큐멀레이터를 지원하나 사용자 정의 형태도 가능하다.

어큐멀레이터에 이름을 지정하면 이 실행 결과는 Spark Web UI에 표시되기 때문에 모니터링하기에 매우 편리하다.


4. 애플리케이션, Job, Stage, Task 개념

이전에 Transformation과 Action의 개념에 대해서 설명하였다. Action이 실제로 어떤 연산을 하는 작업이라고 하였다. 이 Action 하나당 1개의 Job이 존재한다. 그리고 이 Job은 일련의 Stage와 Task들로 구성된다.

Stage는 다수의 머신에서 Task의 그룹이다. Task들은 모두 동일한 연산을 수행하게 되는데, 파티션 1개당 1개의 task가 주어진다. 그리고 Executor는 1개의 파티션에 대해 작업을 처리하게 된다. 그림으로 보면 아래와 같다.

위 설명처럼, 각 Stage는 Shuffle이 발생했는지의 여부에 따라 구분되게 된다. 참고로 Shuffle은 각 노드 사이에 데이터의 이동이 발생하는 것을 의미한다.

그리고 최종적으로 이러한 여러 Job들이 모여 전체 Spark 애플리케이션을 구성한다.


5. Join

Spark에서의 Join을 찾아보면, Shuffle Join, Broadcast Join, Sort Merge Join이 나올 것이다. 그런데 Shuffle Join은 Spark2.3에서부터 Sort Merge Join으로 대체되었다. (설정으로 변경할 수 있다.) Sort Merge Join이 Shuffle Join에 비해 클러스터내 데이터 이동이 더 적다고 알려져 있다.

Sort Merge Join은 사용자가 일반적으로 DataFrame에 join을 행하면 가장 많이 일어나는 join이다. 먼저 파티션을 정렬한 후 이 정렬된 데이터를 병합하면서 join key가 같은 row를 join하게 된다. 먼저 정렬을 하기 때문에 데이터가 심하게 뒤섞여 있거나 skewed되어 있으면 이 비용이 상당히 크다.

Broadcast Join은 작은 테이블을 큰 테이블에 join할 때 사용된다. 작은 테이블을 클러스터 전체 worker node에 복제하고 이를 캐시하여 계속 사용하는 것인데, 한 번 대규모 통신이 발생하긴 하지만 (이 때의 비용은 클 것이다.) 이후 추가적인 통신이 없기 때문에 굉장히 유용하고 실제로 크게 속도를 향상시켜준다. 보통 이러한 상황에서 Spark는 알아서 Broadcast Join으로 계획을 수립한다. (구조적 API를 사용할 때)

Join 수행 시 시간이 너무 오래걸린다고 생각이 되면 아래 Tip들을 참고하면 좋다.

  • Join될 파티션들이 최대한 같은 곳에 있어야 한다.
  • DataFrame의 데이터가 균등하게 분배되어 있어야 한다. (not skewed)
  • 병렬 처리가 이루어지려면 일정한 수의 고유 key가 있어야 한다.

위와 같은 과정에 대해서 좀 더 자세한 설명이 필요하다면 아래 링크를 참조하면 좋다.


6. Spark Execution 최적화

이전에 기술하였듯이 Shuffle이 발생하면 Stage는 새로 생성하고, 각 Stage는 파티션 개수에 따라 여러 Task로 쪼개진다. 이 Task의 수행 시간은 아래와 같이 또 쪼개볼 수 있다.

Scheduler Delay + Deserialization Time + Shuffle Read Time(Optional) + Executor Runtime + Shuffle Write(Optional) + Result Serialization Time + Getting Result Time

Spark Web UI를 켜셔 Job 모니터링을 하면 자주 볼 수 있는 용어들이다.

Scheduler Delay에 대해 알아보자. Spark는 Data Locality에 크게 영향을 받는다. 데이터가 실제 위치한, 로드된 곳이라고 생각하면 되는데 Spark는 이 데이터 전송을 최소화하기 위해 Task를 데이터와 최대한 가깝게 하여 수행하려고 한다. Data Locality는 아래와 같이 5개로 구분된다.

Priority Locality Level 설명
1 PROCESS_LOCAL 데이터가 실행되는 코드와 같은 JVM에 있음
2 NODE_LOCAL 데이터가 같은 node에 있음
3 NO_PREF 특별히 locality preference가 없는 곳에 데이터가 존재함
4 RACK_LOCAL 데이터가 같은 Rack이지만 다른 서버에 존재하여 네트워크를 통해 전송이 필요함
5 ANY 데이터가 같은 RACK에 있지도 않음

상위에 있을수록 좋은 것인데, 만약 Data Locality가 PROCESS_LOCAL이라면 Task는 굉장히 빠르게 진행될 것이다. 아래에 있는 레벨일 수록 실제로 Task를 수행할 때까지의 시간이 길어지고, 이를 Scheduler Delay라고 한다. 즉, 네트워크 전송 비용이 그만큼 사용된다는 것이다.

만약 가용 Executor가 Data Locality를 만족하지 못하면 timeout까지 그냥 기다리게 되기 때문에 spark.locality.wait 파라미터를 조정할 수 있다. 더 나은 Locality를 위해 더욱 긴 waiting time을 설정하거나 waiting time을 0으로 바꿔버림으로써 이전 단계들을 건너뛸 수도 있다.

이 Data Locality가 낮은 레벨에 속해있고, Shuffle 대상 데이터의 크기가 크다면 이후 Shuffle Read/Write Time은 크게 증가하게 될 것이다. Executor Run Time은 data read/write time, CPU execution time, Java GC time으로 구성된다.

Task의 수행 시간에 대해서는 알아보았고, 그렇다면 좀 더 빠르게 작업이 진행되도록 튜닝을 하려면 어떻게 해야할까? 이 부분은 Spark 완벽 가이드 책과, IBM 그리고 Databricks의 포스팅을 참고하여 요약 정리한다.

먼저 간접적인 성능 향상 기법에 대해 정리한다. 일단 구조화 API를 적극 사용해야 한다. 이전에도 언급하였듯이 구조적 API를 사용하면 Spark의 여러 장점들을 그대로 사용할 수 있다. RDD의 사용 영역은 최소화하는 것이 좋다. 특히 Python으로 RDD 코드를 실행하면 JVM과 Python 프로세스를 오가는 많은 데이터를 직렬화/역직렬화해야 해서 많은 비용이 수반된다.

그리고 다음은 Data Locality를 확인해보는 것이다. 지금 수행하고 있는 Task에 대한 Data Locality가 과연 최선인지 파악해보아야 한다. 다음으로는 Shuffle 설정이다. 이 부분이 상당히 중요하다. Shuffle은 일반적으로 큰 네트워크 비용을 요구하기 때문에 지양되곤 한다. 불필요한 Shuffle은 당연히 피하는 것이 좋다. 그러나 애초에 Shuffle이 존재하는 이유는 데이터를 재 분배하여 더욱 효율적인 처리를 가능하게 만들기 위해서이다. 즉, 잘만 사용하면 성능 향상을 이끌어낼 수 있다.

Data Skeweness가 발견되었거나 파티션 수가 너무 적으면 Shuffle이 도움이 된다. 일단 특정 파티션에만 데이터가 몰려있으면 그 파티션에서 task를 수행하는 Executor의 부담이 커지기 때문에 다른 Executor들의 작업이 끝나도 전체 Stage가 끝나지 않는 현상이 발생하게 된다. 또 애초에 파티션 수가 너무 적으면 작업을 수행하지 않는 Executor가 발생할 수도 있기 때문에 파티션 수 조정이 도움이 되는 경우가 많다. 예를 들어 기본 파티션의 수가 200개이기 때문에 task가 200개로 쪼개져 있을 때, 가용 node의 수가 130개라고 하면, 모든 node의 작업이 끝난 후에 오직 70개의 node만이 2번째 작업을 시작하게 될 것이다. 이는 분명 효율을 다소 낮추는 요인이 된다.

추가적으로 Shuffle을 수행할 때는 Output 파티션 당 최소 수십 메가바이트의 데이터는 포함되는 것이 좋으며, 애플리케이션이 실행 중에 메모리를 너무 많이 사용하거나 GC collection이 너무 자주 수행되는 것은 아닌지 확인해보는 것이 좋다.

직접적인 성능 향상 기법에 대해 알아보자. Executor 당 할당되는 CPU 코어의 수, 그리고 CPU 코어에 할당되는 task 수의 재조정을 통해 병렬화를 향상시킬 수 있다.

파티션 재분배(repartition)는 앞서 언급하였듯이 Shuffle을 수반하지만 데이터가 클러스터에 균등하게 분배되므로 Job의 전체 실행 단계를 최적화할 수 있다. 그리고 만약 Shuffle 없이 파티션의 수를 줄이고 싶다면 Coalesce 메서드를 통해 동일 노드의 파티션을 하나로 합칠 수 있다. 구조화 API 상태에서 repartition을 수행하면 생각보다 괜찮은 성능 향상을 보이는 경우가 많다.

현재의 파티션 기준을 변경할 수 있는데, 특정 칼럼을 기준으로 바로 설정할 수도 있고 사용자 정의 파티셔닝을 사용할 수도 있다. 사실 이 부분은 직접 반영해본 적은 없는데, 사용자 정의 파티션 함수를 생성한 뒤 이를 파티션의 기준으로 삼을 수 있다고 한다. 잘 제어하면 skewed된 데이터를 균등 분배할 수 있다.

이론적인 부분에 대해서는 정리를 마쳤고, 개인적으로 Spark Web UI에서 자주 모니터링 하는 항목들에 대해 간략히 설명하고 마치도록 하겠다.

구분 설명
Shuffle Read Size Stage의 시작 단계에서 Executor에 있는 read serialized data의 크기
Shuffle Write Size Stage의 끝 단계에서 Executor에 있는 written serialized data의 크기
Shuffle Spill Memory 메모리에 있는 deserialized된 형태의 데이터의 크기
Shuffle Spill Disk spill한 후 disk에 있는 serialized된 형태의 데이터의 크기
Peak Execution Memory shuffle/aggregation/join 동안 생성된 내부 데이터 구조에 의해 차지하는 메모리 크기

Shuffle Read Size부터 유심히 보게 된다. Shuffle이 발생했을 때 얼마나 많은 데이터에 대해 네트워크 전송 비용이 들어가는지 가늠할 수 있기 때문이다. Shuffle Spill Memory/Disk는 전체 task가 끝난 후에 집계되며 언제나 Spill Memory > Spill Disk 관계이다. Peak Executiom Memory는 앞서 설명한 accumulator 변수인데, 이 값은 Task 내에서 생성된 모든 데이터 구조의 peak size의 총합과 거의 일치한다. 따라서 내가 다루고 있는 데이터의 전체 size에 대해 추정해볼 수 있다.


7. Apache Arrow

Apache Arrow는 Spark에서만 쓰이는 라이브러리는 아니지만, Spark에서 대단히 중요한 역할을 한다. in-memory columnar 데이터 포맷으로 JVM과 Python 프로세스 사이의 효율적인 데이터 전송 및 변환을 수행하는데, 메모리 공유를 통해 빠른 변환을 가능하게 한다. 또한 Tensorflow 및 Pytorch와도 고성능 데이터 교환 수단을 지원하기 때문에 만약 Spark 2.3.0 이상의 버전을 사용하고 있다면 거의 필수적으로 사용해야 하는 라이브러리이다. Arrow가 설치되어 있으면 효율적인 Vectorized UDF인 Pandas UDF를 사용할 수 있다. Pandas UDF를 사용하면 직렬화 overhead가 거의 발생하지 않기 때문에 속도를 굉장히 향상시킬 수 있다.


References

1) Spark 완벽 가이드 by 빌 체임버스, 마테이 자하리아
2) 참고 블로그
3) 참고 블로그
4) 참고 Medium글
5) 참고 Medium글
6) IBM Docs

Comment  Read more