AWS DDB의 데이터를 지표화 해보도록 노력해보기. DDB -> S3 -> Athena
Intro
- AWS 로 지표관련을 진행하면서 알게된 사항에 대해 정리해 보기.
- AWS 콘솔이 은근 바뀌어서 이제는 다를 수도 있지만, 일단은 여기까지만.
1. DDB 에서 S3 로 데이터 이전하기
- IAM role 생성 필요.
- 이 부분은 잘 모르니 pass
- 아마 Permissions Policies section 의 AWSGlueServiceRoe, AmazonDynamoDBFullAccess, AmazonS3FullAccess 등이 필요한듯?
- DDB 에서 S3 로 데이터 추출하기
- step1. Glue Database 생성하기.
- AWS Glue console 로 이동
- Data Catalog 에서 Database 선택후 Add database 로 추가하자
- {database_name}_ddb 와 같은 식으로 이름을 지어본뒤 Create database 로 생성하자.
- step2. Glue Crawler 생성하기 (source 는 DDB, destination 은 Glue)
- AWS Glue console 에서
- Data Catalog 에서 Crawlers 선택후 Create crawler 로 추가하자
- {tableName_ddb_glue_crawler} 와 같은 크롤러 이름으로 만들어 보자 ( {tableNAme}은 S3 으로 내보내려는 DynamoDB 의 테이블 이름으로 해보자. )
- 데이터가 아직 Glue 테이블에 매핑되지 않았으므로 Add a data source 를 실행하자.
- Drop-down 메뉴에서 Data souce 를 DynamoDB 를 선택하자.
- S3 로 내보내려는 DynamoDB 의 테이블 이름을 지정하자
- Glue, S3, DynamoDB 에 접근하기 위해 이전에 생성한 IAM role 을 선택하자.
- target database 로 이전에 생성한 Glue Database 를 선택하자
- Crawler schedule 를 On-demand 로 설정하자. (추후 스케줄을 바꿀예정)
- Create crawler 로 crawler 를 생성하자
- step3. Glue table 생성을 위해 Crawler 를 수행하기.
- AWS Glue → Data Catalog → Crawlers
- step2 에서 Run crawler 로 생성했던 crawler 를 run 하자.
- complete 되면 Glue table 이 생성이 된다.
- AWS Glue → Data Catalog → Databases
- 원하는 table 을 database 에서 선택한뒤
- list 에서 최근 생성된 항목을 선택해서 스키마 정보가 이상없는지 확인해보자.
-
step4. S3 Bucket 생성하기.
- step5. scheduled Glue ETL 생성하기 (소스는 Glue, destination 은 S3)
- AWS Glue → ETL Jobs
- Visual with a source and target 를 선택한뒤
- Source 는 AWS Glue Data Catalog 를 선택하고, Target 은 Amazon S3 로 선택하자
- Data source
- Data catalog 선택후 이름을 지정하고
- DynamoDB source 에서 Choose from the AWS Glue Data Catalog 를 선택하자
- 많은 예시들이 Choose the DynamoDB table directly 를 선택해서 진행하는데, 이렇게 하면 기본적으로 ddb 의 pk, sk 등만 옮겨진다, 일일이 하나하나 지정해줘야지 다른 컬럼들을 복사할수가 있다.
- Database 에서 이전에 생성한 것으로 지정하자.
- Table 에서 이전한 테이블 이름을 선택하고.
- Transfrom
- S3 로 내보내고 싶지 않는 항목이 있으면 여기서 제거하자.
- 최근에 업데이트로 자동으로 선택되지 않는듯하다. 수동으로 선택해서 추가하자.
- Change Schema 를 추가하자?
- 최근에 업데이트로 자동으로 선택되지 않는듯하다. 수동으로 선택해서 추가하자.
- S3 로 내보내고 싶지 않는 항목이 있으면 여기서 제거하자.
- Data Target
- Format(JSON), Compression Type 등을 지정하자
- Format, Compression Type 별 차이는 모르겠….
- https://medium.com/@autumn.bom/aws-athena-convert-to-parquet-with-snappy-89c93af3c8c0 이 자료보면 도움이 좀 될듯하다.
- Format, Compression Type 별 차이는 모르겠….
- Format(JSON), Compression Type 등을 지정하자
- 네비게이션 바에서 Job detail 을 클릭해서 IAM role 등을 설정 후
- 꼭 Save 후 Run 을 하자. (이건 step6. 이후 진행하자. )
- 현재 화면에 나와있는 세팅으로 Run 되지 않는다. 꼭 Save 하자.
- job 이 실행완료가 되면 위에서 설정한 S3 에 데이터가 잘 보일 것이다.
- step6. Script 수정하기.
- batch 가 실행되는 시점의 날짜별로 파티션을 나누기 위해 아래 작업을 진행함
import datetime
.
.
# 현재 날짜 가져오기
current_date = datetime.datetime.now().strftime('%Y-%m-%d-%H')
print(f"Current date is: {current_date}")
glueContext.get_logger().info(f"Current date is: {current_date}")
year, month, day, hour = current_date.split("-")
print(f"Current date year is: {year}")
glueContext.get_logger().info(f"Current date year is: {year}")
.
.
//ApplyMapping 에 아래 추가.
("year", "string", year, "string"),
("month", "string", month, "string"),
("day", "string", day, "string"),
("hour", "string", hour, "string"),
.
.
.
// 아래와 같은 형식으로 s3 path 지정,
connection_options={
"path": f"s3://.../.../ddb-to-s3/year={year}/month={month}/day={day}/hour={hour}/",
"partitionKeys": [],
},
- 스크립트 수정후 run 하자.
- 실행한 이후엔 항상 s3 용 glue crawler 를 돌려야함.!!
- https://docs.aws.amazon.com/ko_kr/glue/latest/dg/incremental-crawls.html 이런 글이 있는데, 이거 더 확인해봐야 할듯.
- 참고로, 이제는 쿼리 날릴때 꼭 where 문에 year, month, hour 등을 넣어야함
2. S3 에서 Athena 로 Query 하기.
- step1. Glue Database 생성하기
- AWS Glue → Data Catalog → Databases 에서 Add database
- {databseName}_s3 같은 이름을 지은후 Create 하자.
- step2. glue crawler 생성하기 (source 는 S3, destination 은 Glue Table)
- AWS Glue → Data Catalog → Crawlers 에서 create Crawler 하기
- {tableNAme}_s3_glue_crawler 같은 이름을 짓기
- Data source configuration 에서 Not yet 으로 설정후 data source 추가하기.
- Drop down 으로 Data source 는 S3로 지정후 데이터를 읽을 S3 path 등을 지정하자.
- IAM role 지정하고 Target database 는 위에서 생성한 Glue Database 로 하고 crawler schedule 는 Ondemand 로 지정후 Create crawler 로 하면!
- step3. Crawler 를 실행해 Glue table 생성하기
- AWS Glue → Data Catalog → Crawlers 에서 생성했던 crawler 를 선택해서 Run 시키자.
- crawler 가 한번 성공적으로 runs 되면 glue table 가 생성될 것이다. (스키마 확인해 보기)
- AWS Glue → Data Catalog → Databases 에서 database 를 클릭해서 최근생선된 테이블을 확인해 보자.
- step4. Query 결과를 담을 S3. 생성하기.
- Athena 에서 query 한 결과물을 받을 s3 를 따로 만들자.
- step5. crawler 에 의해 생성한 Glue table 을 Athena 로 쿼리하기.
- 네비게이션 바에서
- Data source 는 AwsDataCatalog 로 하고
- Database 는 위에서 {databaseNAme}_s3 등으로 생선한 것으로 고르고.
- 쿼리를 때리면 실행이 될 것이다
- 네비게이션 바에서
3. DDB 에서 S3 로 데이터 좀더 잘 이전하기
-
위의 예시에서 DDB 를 S3로 이전할때 URI가 항상 동일한 문제가 있다. 여러가지로 확인해본 결과 visual ETL로는 되지 않는것을 확인
- Step1.
- AWS Glue → ETL jobs → Visual ETL 에서 생성했던 job 을 선택하자.
- Step2.
- Visual 옆의 Script 를 선택해서 로직을 추가하자.
- Script 를 건들면 Visual 은 사라져 버린다.. 참고하세요.
- 아래와 같은 로직으로 추가해서 실행할때마다 S3 path 를 다르게 지정할수 있다.
import datetime
...
# Script generated for node S3 bucket
current_datetime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
output_path = f"s3://.../.../ddb-to-s3/{current_datetime}/"
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="s3",
format="json",
connection_options={
# "path": "s3://.../.../ddb-to-s3/", # 기존 로직.
"path": output_path,
"partitionKeys": [],
},
transformation_ctx="S3bucket_node3",
)
job.commit()
4. 특정 주기로 Query 해보기.
- 몇몇 가지 방법이 있는듯 하지만. SDK 를 이용해서 하는 법이 나에겐 가장 용이해 보임
- https://repost.aws/knowledge-center/schedule-query-athena
- 괜찮은 설명이 없어 우선 Java SDK를 이용해서 Athena 로 Query 하는거 진행 해봄.
- step1.
- Athena 에 접근하기 위한 client 생성
- step2.
- query 실행해서 execution ID 받아오기
- step3.
- query 실행이 완료될때까지 기다려서 결과값 가져오기
// https://docs.aws.amazon.com/athena/latest/ug/code-samples.html 이거 참고해서 구현중.
int CLIENT_EXECUTION_TIMEOUT = 100000;
String ATHENA_OUTPUT_BUCKET = "s3://output/athena-test-results/";
String ATHENA_SAMPLE_QUERY = "select * from \"test_origin_ddb\".\"ddb_to_s3\" limit 10;"; // change the Query statement to match your environment
long SLEEP_AMOUNT_IN_MS = 1000;
String ATHENA_DEFAULT_DATABASE = "test_origin_ddb"; // change the database to match your database
public void test_athena(){
log.info("\n\n ============================= test_athena start =============================\n\n");
String accessKeyId = ".......";
String secretAccessKey = "........";
String secToken = "......";
AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, secToken));
AthenaClient athenaClient = AthenaClient.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(awsCredentialsProvider)
.httpClient(UrlConnectionHttpClient.builder().build())
.build();
String queryExecutionId = submitAthenaQuery(athenaClient);
try {
waitForQueryToComplete(athenaClient, queryExecutionId);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
}
public String submitAthenaQuery(AthenaClient athenaClient) {
try {
// The QueryExecutionContext allows us to set the database.
QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
.database(ATHENA_DEFAULT_DATABASE)
.build();
// The result configuration specifies where the results of the query should go.
ResultConfiguration resultConfiguration = ResultConfiguration.builder()
.outputLocation(ATHENA_OUTPUT_BUCKET)
.build();
StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
.queryString(ATHENA_SAMPLE_QUERY)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
.build();
StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
return startQueryExecutionResponse.queryExecutionId();
} catch (AthenaException e) {
e.printStackTrace();
System.exit(1);
}
return "";
}
// Wait for an Amazon Athena query to complete, fail or to be cancelled.
public void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException {
GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
.queryExecutionId(queryExecutionId)
.build();
GetQueryExecutionResponse getQueryExecutionResponse;
boolean isQueryStillRunning = true;
while (isQueryStillRunning) {
getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
if (queryState.equals(QueryExecutionState.FAILED.toString())) {
throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse
.queryExecution().status().stateChangeReason());
} else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
throw new RuntimeException("The Amazon Athena query was cancelled.");
} else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
isQueryStillRunning = false;
} else {
// Sleep an amount of time before retrying again.
Thread.sleep(SLEEP_AMOUNT_IN_MS);
}
System.out.println("The current status is: " + queryState);
}
}
// This code retrieves the results of a query
public void processResultRows(AthenaClient athenaClient, String queryExecutionId) {
try {
// Max Results can be set but if its not set,
// it will choose the maximum page size.
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId)
.build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
for (GetQueryResultsResponse result : getQueryResultsResults) {
List<ColumnInfo> columnInfoList = result.resultSet().resultSetMetadata().columnInfo();
List<Row> results = result.resultSet().rows();
processRow(results, columnInfoList);
}
} catch (AthenaException e) {
e.printStackTrace();
System.exit(1);
}
}
private void processRow(List<Row> row, List<ColumnInfo> columnInfoList) {
for (Row myRow : row) {
List<Datum> allData = myRow.data();
for (Datum data : allData) {
System.out.println("The value of the column is "+data.varCharValue());
}
}
}
- 현재 우리가 사용하는 개발환경에 맞게 lamda + golang 으로도 해보기
- lamda. 제한이 5분 이긴 하지만.
- https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
- long-running worker 접근에 따라
- https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html
- 장기실행 작업자 접근법으로 가능은 하는듯.
- https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-task-state.html#amazon-states-language-task-state-activity
- lamda. 제한이 5분 이긴 하지만.
package echo
import (
"fmt"
"net/http"
"ororchat-api/plLogger"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/labstack/echo/v4"
)
// https://gist.github.com/kartiksura/93160be1078648a14ec0ddc125c35546
func (handler AdminEchoInputAdaptor) AdminAthneaTest(ctx echo.Context) error {
fmt.Println("--------------")
plLogger.Error(ctx.Request().Context(), "AdminAthneaTest Start")
awscfg := &aws.Config{}
awscfg.WithRegion("ap-northeast-2")
// Create the session that the service will use.
sess := session.Must(session.NewSession(awscfg))
svc := athena.New(sess, aws.NewConfig().WithRegion("ap-northeast-2"))
var s athena.StartQueryExecutionInput
s.SetQueryString("select * from ddb_to_s3 limit 10")
var q athena.QueryExecutionContext
q.SetDatabase("nick_test_origin_ddb")
s.SetQueryExecutionContext(&q)
var r athena.ResultConfiguration
r.SetOutputLocation("s3://output/athena-test-results/test001/")
s.SetResultConfiguration(&r)
result, err := svc.StartQueryExecution(&s)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println("StartQueryExecution result:")
fmt.Println(result.GoString())
var qri athena.GetQueryExecutionInput
qri.SetQueryExecutionId(*result.QueryExecutionId)
var qrop *athena.GetQueryExecutionOutput
duration := time.Duration(2) * time.Second // Pause for 2 seconds
for {
qrop, err = svc.GetQueryExecution(&qri)
if err != nil {
fmt.Println(err)
return err
}
if *qrop.QueryExecution.Status.State != "RUNNING" {
break
}
fmt.Println("waiting.")
time.Sleep(duration)
}
if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
var ip athena.GetQueryResultsInput
ip.SetQueryExecutionId(*result.QueryExecutionId)
op, err := svc.GetQueryResults(&ip)
if err != nil {
fmt.Println(err)
return err
}
fmt.Printf("%+v", op)
} else {
fmt.Println(*qrop.QueryExecution.Status.State)
}
return ctx.JSON(http.StatusOK, "{}")
}
- 개발 참고
- https://gist.github.com/kartiksura/93160be1078648a14ec0ddc125c35546
- https://frozenpond.tistory.com/211
- 아래 자료 더 봐야함
- https://medium.com/@autumn.bom/aws-athena-partitioning-a3ac1c40011d
- https://velog.io/@brillog/AWS-Glue%EB%A1%9C-DynamoDB-%EB%8D%B0%EC%9D%B4%ED%84%B0-S3%EC%97%90-%EC%A0%81%EC%9E%AC%ED%95%98%EA%B8%B0-ETL
- https://itecnote.com/tecnote/amazon-web-services-how-to-partition-data-by-datetime-in-aws-glue/
참고
- https://repost.aws/knowledge-center/schedule-query-athena
- https://stackoverflow.com/questions/76221483/aws-glue-job-job-bookmark-incremental-daily-extraction-from-rds-to-s3
- https://medium.com/@1ra4vi3/etl-pipeline-data-from-ddb-to-s3-and-query-data-in-s3-using-aws-athena-4b7b52d7c289
- https://docs.aws.amazon.com/athena/latest/ug/code-samples.html
- https://medium.com/@autumn.bom/aws-athena-convert-to-parquet-with-snappy-89c93af3c8c0
- https://medium.com/@autumn.bom/aws-athena-partitioning-a3ac1c40011d
- https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html