Skip to content

Commit a8d0302

Browse files
author
seolmin
committed
refactor: modify JobTasks for Google Cloud Storage
1 parent 09c5f9d commit a8d0302

5 files changed

Lines changed: 47 additions & 38 deletions

File tree

src/cloudforet/cost_analysis/connector/google_storage_collector.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ def __init__(self, *args, **kwargs):
3232
project=self.secret_data["project_id"], credentials=self.credentials
3333
)
3434

35-
def get_cost_data(self, base_url):
36-
_LOGGER.debug(f"[get_cost_data] base url: {base_url}")
35+
def get_cost_data(self, bucket_name: str):
36+
_LOGGER.debug(f"[get_cost_data] bucket name: {bucket_name}")
3737

38-
bucket_name = self.secret_data.get("bucket_name")
3938
bucket = self.client.get_bucket(bucket_name)
4039
blob_names = [blob.name for blob in bucket.list_blobs()]
4140

@@ -47,7 +46,9 @@ def get_cost_data(self, base_url):
4746
csv_file_path = os.path.join(tmpdir, blob_name)
4847
blob.download_to_filename(csv_file_path)
4948
costs_data = self._get_csv(csv_file_path)
50-
_LOGGER.debug(f"[get_cost_data] costs count: {len(costs_data)}")
49+
_LOGGER.debug(
50+
f"[get_cost_data] costs count of {blob_name} : {len(costs_data)}"
51+
)
5152

5253
# Paginate
5354
page_count = int(len(costs_data) / _PAGE_SIZE) + 1

src/cloudforet/cost_analysis/connector/http_file_connector.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import numpy as np
44
import chardet
55
import requests
6-
import google.oauth2.service_account
7-
8-
from spaceone.core.transaction import Transaction
96
from spaceone.core.connector import BaseConnector
107
from typing import List
118

src/cloudforet/cost_analysis/manager/cost_manager.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,37 @@
99

1010
_LOGGER = logging.getLogger(__name__)
1111

12-
_REQUIRED_FIELDS = [
13-
"cost",
14-
"currency",
15-
"year",
16-
"month",
17-
]
12+
_REQUIRED_FIELDS = ["cost", "currency", "billed_date"]
1813

1914

2015
class CostManager(BaseManager):
2116
def __init__(self, *args, **kwargs):
2217
super().__init__(*args, **kwargs)
23-
self.http_file_connector: HTTPFileConnector = self.locator.get_connector(
24-
HTTPFileConnector
25-
)
18+
self.default_vars = None
19+
self.field_mapper = None
2620

2721
def get_data(self, options, secret_data, schema, task_options):
28-
self.http_file_connector.create_session(options, secret_data, schema)
2922
self._check_task_options(task_options)
3023

31-
base_url = task_options["base_url"]
24+
if "default_vars" in options:
25+
self.default_vars = options["default_vars"]
26+
27+
if "field_mapper" in options:
28+
self.field_mapper = options["field_mapper"]
29+
3230
if not secret_data:
33-
response_stream = self.http_file_connector.get_cost_data(base_url)
31+
base_url = task_options["base_url"]
32+
http_file_connector = self.locator.get_connector(HTTPFileConnector)
33+
http_file_connector.create_session(options, secret_data, schema)
34+
response_stream = http_file_connector.get_cost_data(base_url)
3435
else:
3536
# just for Google Cloud Storage
37+
bucket_name = task_options["bucket_name"]
3638
storage_connector = self.locator.get_connector(
3739
GoogleStorageConnector, secret_data=secret_data
3840
)
39-
response_stream = storage_connector.get_cost_data(base_url)
41+
response_stream = storage_connector.get_cost_data(bucket_name)
42+
4043
for results in response_stream:
4144
yield self._make_cost_data(results)
4245

@@ -46,10 +49,10 @@ def _make_cost_data(self, results):
4649
result = self._apply_strip_to_dict_keys(result)
4750
result = self._apply_strip_to_dict_values(result)
4851

49-
if self.http_file_connector.field_mapper:
52+
if self.default_vars:
5053
result = self._change_result_by_field_mapper(result)
5154

52-
if self.http_file_connector.default_vars:
55+
if self.field_mapper:
5356
self._set_default_vars(result)
5457

5558
self._create_billed_date(result)
@@ -86,8 +89,8 @@ def _make_cost_data(self, results):
8689

8790
@staticmethod
8891
def _check_task_options(task_options):
89-
if "base_url" not in task_options:
90-
raise ERROR_REQUIRED_PARAMETER(key="task_options.base_url")
92+
if "base_url" not in task_options or "bucket_name" not in task_options:
93+
raise ERROR_REQUIRED_PARAMETER(key="task_options")
9194

9295
@staticmethod
9396
def _apply_strip_to_dict_keys(result):
@@ -106,7 +109,7 @@ def _apply_strip_to_dict_values(result):
106109
return result
107110

108111
def _change_result_by_field_mapper(self, result):
109-
for origin_field, actual_field in self.http_file_connector.field_mapper.items():
112+
for origin_field, actual_field in self.field_mapper.items():
110113
if isinstance(actual_field, str):
111114
if actual_field in result:
112115
result[origin_field] = result[actual_field]
@@ -124,7 +127,7 @@ def _change_result_by_field_mapper(self, result):
124127
del result[actual_additional_field]
125128
result[origin_field] = additional_info
126129

127-
return result
130+
return result
128131

129132
def _create_billed_date(self, result):
130133
if self._exist_billed_date(result):
@@ -170,7 +173,7 @@ def _apply_parse_date(date):
170173
raise e
171174

172175
def _set_default_vars(self, result):
173-
for key, value in self.http_file_connector.default_vars.items():
176+
for key, value in self.default_vars.items():
174177
result[key] = value
175178

176179
@staticmethod

src/cloudforet/cost_analysis/manager/job_manager.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
2-
from datetime import datetime, timedelta
32

3+
from spaceone.core.error import ERROR_REQUIRED_PARAMETER
44
from spaceone.core.manager import BaseManager
55
from cloudforet.cost_analysis.model.job_model import Tasks
66
from cloudforet.cost_analysis.connector.http_file_connector import HTTPFileConnector
@@ -11,21 +11,28 @@
1111
class JobManager(BaseManager):
1212
def __init__(self, *args, **kwargs):
1313
super().__init__(*args, **kwargs)
14-
self.http_file_connector: HTTPFileConnector = self.locator.get_connector(
15-
HTTPFileConnector
16-
)
1714

1815
def get_tasks(self, options, secret_data, schema, start, last_synchronized_at):
19-
self.http_file_connector.create_session(options, secret_data, schema)
20-
2116
tasks = []
2217
changed = []
18+
if "base_url" in options:
19+
http_file_connector = self.locator.get_connector(HTTPFileConnector)
20+
http_file_connector.create_session(options, secret_data, schema)
21+
22+
for base_url in http_file_connector.base_url:
23+
task_options = {"base_url": base_url}
24+
25+
tasks.append({"task_options": task_options})
26+
changed.append({"start": "1900-01"})
2327

24-
for base_url in self.http_file_connector.base_url:
25-
task_options = {"base_url": base_url}
28+
elif "bucket_name" in options:
29+
for bucket_name in options["bucket_name"]:
30+
task_options = {"bucket_name": bucket_name}
2631

27-
tasks.append({"task_options": task_options})
28-
changed.append({"start": "1900-01"})
32+
tasks.append({"task_options": task_options})
33+
changed.append({"start": "1900-01"})
34+
else:
35+
raise ERROR_REQUIRED_PARAMETER(key="options")
2936

3037
_LOGGER.debug(f"[get_tasks] tasks: {tasks}")
3138
_LOGGER.debug(f"[get_tasks] changed: {changed}")

src/cloudforet/cost_analysis/model/job_model.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77

88
class TaskOptions(Model):
9-
base_url = StringType(required=True)
9+
base_url = StringType(default=None)
10+
bucket_name = StringType(default=None)
1011

1112

1213
class Task(Model):

0 commit comments

Comments
 (0)