Skip to content Skip to sidebar Skip to footer

Creating A Data Pipeline To Bigquery Using Cloud Functions And Cloud Scheduler

I am trying to build a Data Pipeline that will download the data from this website and push it to a BigQuery Table. def OH_Data_Pipeline(trigger='Yes'): if trigger=='Yes':

Solution 1:

When I tested your code, I received the following error message: OH_Data_Pipeline() takes from 0 to 1 positional arguments but 2 were given

You should modify your function definition to follow the sample code (I'm not sure what trigger is used for, so for now I'm just hardcoding it to 'Yes'):

def OH_Data_Pipeline(event, context):
    trigger='Yes'
    ...

Also, make sure you have a requirements.txt file and it has the correct libraries specified:

pandas
pandas_gbq
datetime

After all of these changes, I then receive this error:

Error:function terminated. Recommended action: inspect logs for termination reason. Details:
<urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable toget local issuer certificate (_ssl.c:1051)>

There seems to be an issue with the SSL certificate for the domain you are trying to access. You will need to play around with read_csv() to allow working with the domain despite the concerns with its certificates.

Solution 2:

I changed your code a little to run on my machine and also in Cloud Functions.

defmain(arg):
    import pandas as pd
    import pandas_gbq
    import datetime, io, requests
    from google.oauth2 import service_account
    schema=[{'name': 'SOS_VOTERID', 'type': 'STRING'},{'name': 'COUNTY_NUMBER', 'type': 'STRING'}, {'name': 'COUNTY_ID', 'type': 'INT64'}, {'name': 'LAST_NAME', 'type': 'STRING'}, {'name': 'FIRST_NAME', 'type': 'STRING'}, {'name': 'MIDDLE_NAME', 'type': 'STRING'}, {'name': 'SUFFIX', 'type': 'STRING'}, {'name': 'DATE_OF_BIRTH', 'type': 'DATE'}, 
        {'name': 'REGISTRATION_DATE', 'type': 'DATE'}, {'name': 'VOTER_STATUS', 'type': 'STRING'}, 
        {'name': 'PARTY_AFFILIATION', 'type': 'STRING'}, {'name': 'RESIDENTIAL_ADDRESS1', 'type': 'STRING'}, 
        {'name': 'RESIDENTIAL_SECONDARY_ADDR', 'type': 'STRING'}, {'name': 'RESIDENTIAL_CITY', 'type': 'STRING'}, 
        {'name': 'RESIDENTIAL_STATE', 'type': 'STRING'}, {'name': 'RESIDENTIAL_ZIP', 'type': 'STRING'}, 
        {'name': 'RESIDENTIAL_ZIP_PLUS4', 'type': 'STRING'}, {'name': 'RESIDENTIAL_COUNTRY', 'type': 'STRING'}, 
        {'name': 'RESIDENTIAL_POSTALCODE', 'type': 'STRING'}, {'name': 'MAILING_ADDRESS1', 'type': 'STRING'}, 
        {'name': 'MAILING_SECONDARY_ADDRESS', 'type': 'STRING'}, {'name': 'MAILING_CITY', 'type': 'STRING'}, 
        {'name': 'MAILING_STATE', 'type': 'STRING'}, {'name': 'MAILING_ZIP', 'type': 'STRING'}, 
        {'name': 'MAILING_ZIP_PLUS4', 'type': 'STRING'}, {'name': 'MAILING_COUNTRY', 'type': 'STRING'}, 
        {'name': 'MAILING_POSTAL_CODE', 'type': 'STRING'}, {'name': 'CAREER_CENTER', 'type': 'STRING'}, 
        {'name': 'CITY', 'type': 'STRING'}, {'name': 'CITY_SCHOOL_DISTRICT', 'type': 'STRING'}, 
        {'name': 'COUNTY_COURT_DISTRICT', 'type': 'STRING'}, {'name': 'CONGRESSIONAL_DISTRICT', 'type': 'STRING'}, 
        {'name': 'COURT_OF_APPEALS', 'type': 'STRING'}, {'name': 'EDU_SERVICE_CENTER_DISTRICT', 'type': 'STRING'}, 
        {'name': 'EXEMPTED_VILL_SCHOOL_DISTRICT', 'type': 'STRING'}, {'name': 'LIBRARY', 'type': 'STRING'}, 
        {'name': 'LOCAL_SCHOOL_DISTRICT', 'type': 'STRING'}, {'name': 'MUNICIPAL_COURT_DISTRICT', 'type': 'STRING'}, 
        {'name': 'PRECINCT_NAME', 'type': 'STRING'}, {'name': 'PRECINCT_CODE', 'type': 'STRING'}, 
        {'name': 'STATE_BOARD_OF_EDUCATION', 'type': 'STRING'}, {'name': 'STATE_REPRESENTATIVE_DISTRICT', 'type': 'STRING'}, 
        {'name': 'STATE_SENATE_DISTRICT', 'type': 'STRING'}, {'name': 'TOWNSHIP', 'type': 'STRING'}, 
        {'name': 'VILLAGE', 'type': 'STRING'}, {'name': 'WARD', 'type': 'STRING'}, 
        {'name': 'PRIMARY_03_07_2000', 'type': 'STRING'}, {'name': 'GENERAL_11_07_2000', 'type': 'INT64'}, 
        {'name': 'SPECIAL_05_08_2001', 'type': 'STRING'}, {'name': 'GENERAL_11_06_2001', 'type': 'INT64'}, 
        {'name': 'PRIMARY_05_07_2002', 'type': 'STRING'}, {'name': 'GENERAL_11_05_2002', 'type': 'INT64'}, 
        {'name': 'SPECIAL_05_06_2003', 'type': 'STRING'}, {'name': 'GENERAL_11_04_2003', 'type': 'INT64'}, 
        {'name': 'PRIMARY_03_02_2004', 'type': 'STRING'}, {'name': 'GENERAL_11_02_2004', 'type': 'INT64'}, 
        {'name': 'SPECIAL_02_08_2005', 'type': 'STRING'}, {'name': 'PRIMARY_05_03_2005', 'type': 'STRING'}, 
        {'name': 'PRIMARY_09_13_2005', 'type': 'STRING'}, {'name': 'GENERAL_11_08_2005', 'type': 'INT64'}, 
        {'name': 'SPECIAL_02_07_2006', 'type': 'STRING'}, {'name': 'PRIMARY_05_02_2006', 'type': 'STRING'}, 
        {'name': 'GENERAL_11_07_2006', 'type': 'INT64'}, {'name': 'PRIMARY_05_08_2007', 'type': 'STRING'}, 
        {'name': 'PRIMARY_09_11_2007', 'type': 'STRING'}, {'name': 'GENERAL_11_06_2007', 'type': 'INT64'}, 
        {'name': 'PRIMARY_11_06_2007', 'type': 'STRING'}, {'name': 'GENERAL_12_11_2007', 'type': 'INT64'}, 
        {'name': 'PRIMARY_03_04_2008', 'type': 'STRING'}, {'name': 'PRIMARY_10_14_2008', 'type': 'STRING'}, 
        {'name': 'GENERAL_11_04_2008', 'type': 'INT64'}, {'name': 'GENERAL_11_18_2008', 'type': 'INT64'}, 
        {'name': 'PRIMARY_05_05_2009', 'type': 'STRING'}, {'name': 'PRIMARY_09_08_2009', 'type': 'STRING'}, 
        {'name': 'PRIMARY_09_15_2009', 'type': 'STRING'}, {'name': 'PRIMARY_09_29_2009', 'type': 'STRING'}, 
        {'name': 'GENERAL_11_03_2009', 'type': 'INT64'}, {'name': 'PRIMARY_05_04_2010', 'type': 'STRING'}, 
        {'name': 'PRIMARY_07_13_2010', 'type': 'STRING'}, {'name': 'PRIMARY_09_07_2010', 'type': 'STRING'}, 
        {'name': 'GENERAL_11_02_2010', 'type': 'INT64'}, {'name': 'PRIMARY_05_03_2011', 'type': 'STRING'}, 
        {'name': 'PRIMARY_09_13_2011', 'type': 'STRING'}, {'name': 'GENERAL_11_08_2011', 'type': 'INT64'}, 
        {'name': 'PRIMARY_03_06_2012', 'type': 'STRING'}, {'name': 'GENERAL_11_06_2012', 'type': 'INT64'}, 
        {'name': 'PRIMARY_05_07_2013', 'type': 'STRING'}, {'name': 'PRIMARY_09_10_2013', 'type': 'STRING'}, 
        {'name': 'PRIMARY_10_01_2013', 'type': 'STRING'}, {'name': 'GENERAL_11_05_2013', 'type': 'INT64'}, 
        {'name': 'PRIMARY_05_06_2014', 'type': 'STRING'}, {'name': 'GENERAL_11_04_2014', 'type': 'INT64'}, 
        {'name': 'PRIMARY_05_05_2015', 'type': 'STRING'}, {'name': 'PRIMARY_09_15_2015', 'type': 'STRING'}, 
        {'name': 'GENERAL_11_03_2015', 'type': 'INT64'}, {'name': 'PRIMARY_03_15_2016', 'type': 'STRING'}, 
        {'name': 'GENERAL_06_07_2016', 'type': 'INT64'}, {'name': 'PRIMARY_09_13_2016', 'type': 'STRING'}, 
        {'name': 'GENERAL_11_08_2016', 'type': 'INT64'}, {'name': 'PRIMARY_05_02_2017', 'type': 'STRING'}, 
        {'name': 'PRIMARY_09_12_2017', 'type': 'STRING'}, {'name': 'GENERAL_11_07_2017', 'type': 'INT64'}, 
        {'name': 'PRIMARY_05_08_2018', 'type': 'STRING'}, {'name': 'GENERAL_08_07_2018', 'type': 'INT64'}, 
        {'name': 'GENERAL_11_06_2018', 'type': 'INT64'}, {'name': 'PRIMARY_05_07_2019', 'type': 'STRING'}, 
        {'name': 'PRIMARY_09_10_2019', 'type': 'STRING'}, {'name': 'GENERAL_11_05_2019', 'type': 'INT64'}]
    prim_list = ['PRIMARY-03/07/2000', 'SPECIAL-05/08/2001', 'PRIMARY-05/07/2002', 'SPECIAL-05/06/2003', 'PRIMARY-03/02/2004', 
            'SPECIAL-02/08/2005', 'PRIMARY-05/03/2005', 'PRIMARY-09/13/2005', 'SPECIAL-02/07/2006', 'PRIMARY-05/02/2006', 
            'PRIMARY-05/08/2007', 'PRIMARY-09/11/2007', 'PRIMARY-11/06/2007', 'PRIMARY-03/04/2008', 'PRIMARY-10/14/2008', 
            'PRIMARY-05/05/2009', 'PRIMARY-09/08/2009', 'PRIMARY-09/15/2009', 'PRIMARY-09/29/2009', 'PRIMARY-05/04/2010', 
            'PRIMARY-07/13/2010', 'PRIMARY-09/07/2010', 'PRIMARY-05/03/2011', 'PRIMARY-09/13/2011', 'PRIMARY-03/06/2012', 
            'PRIMARY-05/07/2013', 'PRIMARY-09/10/2013', 'PRIMARY-10/01/2013', 'PRIMARY-05/06/2014', 'PRIMARY-05/05/2015', 
            'PRIMARY-09/15/2015', 'PRIMARY-03/15/2016', 'PRIMARY-09/13/2016', 'PRIMARY-05/02/2017', 'PRIMARY-09/12/2017', 
            'PRIMARY-05/08/2018', 'PRIMARY-05/07/2019', 'PRIMARY-09/10/2019']
    prim_list = [f.replace('-', '_').replace('/', '_') for f in prim_list]
    gen_list = ['GENERAL-11/07/2000', 'GENERAL-11/06/2001', 'GENERAL-11/05/2002', 'GENERAL-11/04/2003', 'GENERAL-11/02/2004', 
           'GENERAL-11/08/2005', 'GENERAL-11/07/2006', 'GENERAL-11/06/2007', 'GENERAL-12/11/2007', 'GENERAL-11/04/2008', 
           'GENERAL-11/18/2008', 'GENERAL-11/03/2009', 'GENERAL-11/02/2010', 'GENERAL-11/08/2011', 'GENERAL-11/06/2012', 
           'GENERAL-11/05/2013', 'GENERAL-11/04/2014', 'GENERAL-11/03/2015', 'GENERAL-06/07/2016', 'GENERAL-11/08/2016', 
           'GENERAL-11/07/2017', 'GENERAL-08/07/2018', 'GENERAL-11/06/2018', 'GENERAL-11/05/2019']
    gen_list = [f.replace('-', '_').replace('/', '_') for f in gen_list]
    party_list = ['PARTY_AFFILIATION']
    df= [pd.read_csv(io.StringIO(str(requests.get('https://www6.sos.state.oh.us/ords/f?p=VOTERFTP:DOWNLOAD::FILE:NO:2:P2_PRODUCT_NUMBER:{}'.format(88+f), verify=False).text)),encoding='Latin1', low_memory=False) for f inrange(1, 2) ]
    #df=[pd.read_csv('https://www6.sos.state.oh.us/ords/f?p=VOTERFTP:DOWNLOAD::FILE:NO:2:P2_PRODUCT_NUMBER:{}'.format(88+f), encoding='Latin1', low_memory=False) for f in range(1, 17)]
    df=pd.concat(df)
    df.columns = [f.replace('-', '_').replace('/', '_') for f in df.columns]
    df['birth_year'] = df['DATE_OF_BIRTH'].map(lambda x: str(x)[:-6]).astype(int)
    df['Age'] = datetime.datetime.now().year - df['birth_year']
    for f in prim_list:
        df.loc[df[f]=='D', f]='Democrat'
        df.loc[df[f]=='R', f]='Republican'
        df.loc[df[f]=='G', f]='Green'
        df.loc[df[f]=='E', f]='Reform'
        df.loc[df[f]=='L', f]='Libertarian'
        df.loc[df[f]=='C', f]='Constitution'
        df.loc[df[f]=='N', f]='Natural Law'
        df.loc[df[f]=='S', f]='Socialist'
        df.loc[df[f]=='X', f]='Without Affiliation'
        df.loc[(df[f]=='') | (df[f].isnull()==True) | (df[f]==0), f]='Not Voted'for f in party_list:
        df.loc[df[f]=='D', f]='Democrat'
        df.loc[df[f]=='R', f]='Republican'
        df.loc[df[f]=='G', f]='Green'
        df.loc[df[f]=='E', f]='Reform'
        df.loc[df[f]=='L', f]='Libertarian'
        df.loc[df[f]=='C', f]='Constitution'
        df.loc[df[f]=='N', f]='Natural Law'
        df.loc[df[f]=='S', f]='Socialist'
        df.loc[df[f]=='X', f]='Unaffiliated'
        df.loc[(df[f]=='') | (df[f].isnull()==True) | (df[f]==0), f]='Unaffiliated'for g in gen_list:  
        df.loc[(df[g]!='') & (df[g].isnull()!=True) & (df[g]!=0) & (df[g]!='NaN'), g]=1
        df.loc[(df[g]=='') | (df[g].isnull()==True) | (df[g]==0) | (df[g]=='NaN'), g]=0
    df[gen_list]=df[gen_list].astype(int)
    df[prim_list]=df[prim_list].astype(str)
    df[party_list]=df[party_list].astype(str)
    df.to_gbq(destination_table='Voterfile.OH_Voterfile', project_id='astute-acolyte-260912', if_exists='replace', table_schema=schema, reauth=False)

Notice that the verify=Falseis ONLY for testing proposes. It shall not be used in production. After running your code in my machine and in Cloud Functions, I realized two things:

  1. Your code takes a long time to run since it needs to download and process files. Given that, it should be deployed in Cloud Functions because Cloud Functions has a maximum timeout of 9 minutes as you can see here
  2. To perform all this downloads and transformations, your code uses a lot of memory. I tried to run on Cloud Functions with the maximum possible amount of memory (2GB) and It reached the limit of the memory.

You could try using a VM in Compute Engine for that. In this case you could also use Cloud Schedule to turn on and turn off your VM in the exactly time you want. You can find the tutorial for that here

For creating a VM in Compute Engine, you can follow this tutorial Creating an instance from a public image

Keep in mind that when using VMs in Compute Engine, you will pay for the processing when the VM is turned on and for the storage of the VM`s disk both when the VM is turned on or off. After creating the VM, you can access it through the console. You should prepare the enviroment for your code in the VM the same way you did in your local machine.

After having your enviroment configured and a deployable code, you can use crontab to schedule the execution of your script by the system.

Now we have the last step: configure a Cloud Schedule to turn on an turn off your VM in the righ time. You can find the tutorial here. You should schedule your VM to turn on some minutes before the time you define in the crontab.

Post a Comment for "Creating A Data Pipeline To Bigquery Using Cloud Functions And Cloud Scheduler"