Nirmal Kumar Kosuru:
This is my Python code import boto3
import sys
import os
import pandas as pd
import csv
import io
import time
import numpy as np
import codecs
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
ACCESS_KEY_ID = os.environ.get('ACCESS_KEY_ID')
SECRET_ACCESS_KEY = os.environ.get('SECRET_ACCESS_KEY')
# SNOWFLAKE_ACCOUNT = os.environ.get('SF_ACOUNT')# SNOWFLAKE_USER = os.environ.get('SF_USR')# SNOWFLAKE_PWD = os.environ.get('SF_PWD')"""Accessing the S3 buckets using boto3 client"""
s3_client =boto3.client('s3')
s3_bucket_name='toraniapacheairflow'
s3 = boto3.resource('s3',
aws_access_key_id= ACCESS_KEY_ID,
aws_secret_access_key=SECRET_ACCESS_KEY)
# Python Snowflake Connectorengine = create_engine(URL(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PWD,
role="SYSADMIN",
warehouse="WORLDPANTRYDW",
database="DATAANALYTICS",
schema="PUBLIC",
fast_executemany=True
))
my_bucket=s3.Bucket(s3_bucket_name)
bucket_list = []
for file in my_bucket.objects.filter(Prefix = 'Source_files/WorldPantry/'):
file_name=file.key
if file_name.find(".csv")!=-1:
bucket_list.append(file.key)
length_bucket_list=len(bucket_list)
#print(length_bucket_list)
from io import StringIO
df = [] # Initializing empty list of dataframes
for file in bucket_list:
path, filename = os.path.split(file)
#print(filename)
obj = s3.Object(s3_bucket_name, file)
data = obj.get()['Body'].read()
#df = pd.read_csv(io.BytesIO(obj['Body'].read()))
#prefix_df.append(df)
df.append(pd.read_csv(io.BytesIO(data), header=0, delimiter=",", encoding="ISO-8859-1", low_memory=False,error_bad_lines=False))
#print(len(df.column()))
#print(len(df))
converted_df = pd.DataFrame(columns=['ORDER_DATE','ORDER_NUMBER','CUSTOMER_NUMBER','ORDER_TOTAL','SHIPPING_TOTAL','TAX_TOTAL','DISCOUNT_TOTAL','PRODUCT_TOTAL','SHIPPING_INFO_TOTAL','PRODUCT_ID','VARIANT_ID','CATALOG_NAME','QUANTITY','PRICE','DISPLAY_NAME','SHIPPING_FIRSTNAME','SHIPPING_LASTNAME','SHIPPING_ADDRESS_LINE1','SHIPPING_ADDRESS_LINE2','SHIPPING_CITY','SHIPPING_STATE','SHIPPING_ZIP','SHIPPING_COUNTRY','SHIPPING_PHONE','SHIPPING_EMAILADDRESS','BILLING_FIRSTNAME','BILLING_LASTNAME','BILLING_ADDRESS_LINE1','BILLING_ADDRESS_LINE2','BILLING_CITY','BILLING_STATE','BILLING_ZIP','BILLING_COUNTRY','BILLING_PHONE','Billing_EmailAddress','OFFER_CODE','PAYMENT_METHOD'])
for file in df:
print(len(df))
print(path)
print(filename)
converted_df1 = pd.DataFrame(data = file)
converted_df = pd.DataFrame(np.concatenate([converted_df.values, converted_df1.values])) #, columns=converted_df.columns
#converted_df = converted_df.convert_dtypes()
#print(file)
#print(converted_df)
print(converted_df.info())
print(converted_df.shape[0])
print(converted_df.dtypes)
converted_df.fillna('', inplace=True)
print(converted_df.columns)
print(converted_df)
#print(converted_df1)
try:
with engine.connect() as con:
db_cols = list(pd.read_sql('SELECT * FROM PUBLIC.WORLDPANTRY_NEW WHERE 1=2', con))
(converted_df
.rename(columns=dict(zip(converted_df.columns, db_cols)))
.to_sql(name="WORLDPANTRY_NEW",
con=engine,
if_exists="append",
index=False,
chunksize=10000,
index_label=None #,
#method = 'multi'
))
#sql = (
# "INSERT INTO WORLDPANTRY_NEW(ORDER_DATE,ORDER_NUMBER,CUSTOMER_NUMBER,ORDER_TOTAL,SHIPPING_TOTAL,TAX_TOTAL,DISCOUNT_TOTAL,PRODUCT_TOTAL,SHIPPING_INFO_TOTAL,PRODUCT_ID,VARIANT_ID,CATALOG_NAME,QUANTITY,PRICE,DISPLAY_NAME,SHIPPING_FIRSTNAME,SHIPPING_LASTNAME,SHIPPING_ADDRESS_LINE1,SHIPPING_ADDRESS_LINE2,SHIPPING_CITY,SHIPPING_STATE,SHIPPING_ZIP,SHIPPING_COUNTRY,SHIPPING_PHONE,SHIPPING_EMAILADDRESS,BILLING_FIRSTNAME,BILLING_LASTNAME,BILLING_ADDRESS_LINE1,BILLING_ADDRESS_LINE2,BILLING_CITY,BILLING_STATE,BILLING_ZIP,BILLING_COUNTRY,BILLING_PHONE,Billing_EmailAddress,OFFER_CODE,PAYMENT_METHOD) VALUES ()")
# con.execute(sql, tuple(row))
#con.close()
#engine.dispose()
except engine.connector.errors.ProgrammingError as e:
#except exception as e:
[message cut]