API details.
GCS¶
import json
from io import BytesIO
import pandas as pd
from google.cloud import storage
class GCSConnector:
"""
Object: GCSConnector(Object)
Purpose: Connector to the GCS account
"""
def __init__(self, credentials, bucketname):
"""
Initialize Google Cloud Storage Connector to bucket
:param credentials: (str) JSON credentials filename
:param bucketname: (str) bucket name
"""
self._CREDENTIALS = credentials
self._BUCKETNAME = bucketname
self._gcsclient = storage.Client.from_service_account_json(self._CREDENTIALS)
self._bucket = self._gcsclient.get_bucket(self._BUCKETNAME)
def get_file(self, filename):
"""
Get file content from GCS
:param filename:
:return: (BytesIO) GCS File as byte
"""
blob = storage.Blob(filename, self._bucket)
content = blob.download_as_string()
return BytesIO(content)
def send_json(self, json_file, filename):
"""
:param json_file:
:param filename:
:return:
"""
self._bucket.blob(filename).upload_from_string(json.dumps(json_file, ensure_ascii=False))
def send_dataframe(self, df, filename, **kwargs):
"""
:param filename:
:param kwargs:
:return:
"""
self._bucket.blob(filename).upload_from_string(
df.to_csv(**kwargs), content_type="application/octet-stream")
def open_csv_as_dataframe(self, filename, **kwargs):
"""
:param filename:
:param kwargs:
:return:
"""
return pd.read_csv(self.get_file(filename=filename), **kwargs)
def open_json_as_dataframe(self, filename, **kwargs):
"""
:param filename:
:param kwargs:
:return:
"""
return pd.read_json(self.get_file(filename=filename), **kwargs)
def open_excel_as_dataframe(self, filename, **kwargs):
"""
:param filename:
:param kwargs:
:return:
"""
return pd.read_excel(self.get_file(filename=filename), **kwargs)
def file_exists(self, filename):
"""
Check if 'filename' file exists within bucket
:param filename:
:return: (Bool)
"""
return storage.Blob(filename, self._bucket).exists(self._gcsclient)
def list_files(self, prefix, delimiter=None):
return [blob.name for blob in self._bucket.list_blobs(prefix=prefix, delimiter=delimiter)]
Big Query¶
import pandas_gbq
from google.cloud import bigquery
from google.oauth2 import service_account
class BQConnector:
"""
Object: BQConnector(Object)
Purpose: Connector to the Big Query account
"""
def __init__(self, credentials, project_id):
self.project_id = project_id
# Enable the Google Drive API
self.credentials = service_account.Credentials.from_service_account_file(
credentials
)
self.credentials = self.credentials.with_scopes(
[
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/cloud-platform'
]
)
self._client = bigquery.Client(credentials=self.credentials)
self._credentials_gbq = service_account.Credentials.from_service_account_file(credentials)
def read_df(self, bq_sql_query):
return self._client.query(bq_sql_query).to_dataframe()
def write_df(self, df_to_write, dataset, table, if_exists='replace'):
pandas_gbq.to_gbq(
df_to_write
, '{}.{}'.format(dataset, table)
, project_id=self.project_id
, if_exists=if_exists
, credentials=self._credentials_gbq
)
def run_job(self, sql_query):
self._client.query(sql_query).result()