A few weeks ago I made a blog post about How I Saved 10 Minutes with a Python Script. I also I posted about it on LinkedIn. I had some great suggestions from the data community. Specifically with automation. Like updating the dates and week number automatically and having a way for this script to run automatically. Most of the main Python script remains the same (getting data from a database, doing some data manipulation and exporting the CSV file). I will mainly be talking about the automation changes.
This was the original reason for creating my script: Automate a weekly executive report I generate. It gives an overview (financially) of the health of the company.
In this blog post, I’ll be going over my process of how I further automated this process.
Link to my Github repository here: bigquery-python-weekly-report.
Tools Used
Here’s what I used for this project:
IDE: Visual Studio Code
Database: Google BigQuery
Package Manager: Anaconda Distribution
Version Control: Git/Github
Notes
I am not going over the basics of Python like how to use Anaconda Distribution. I’m coming out with a course later this month that will go into those details. This article assumes you already have a basic knowledge of SQL & Python.
All examples in this blog post use simplified, fictional data for educational purposes only. In the real-world, tables typically contain many more rows and columns. But, the code here is to focus understanding and to protect private information.
Automation Steps
Updated Python Script
Created batch job file (needed to run specific environments)
Set up Task Scheduler
Step 1: Updated Python Script
Named weekly_report_automatic.py.
Import the following libraries:
os: To create files and folders on my computer.
pandas: To load the query results into a Pandas DataFrame.
google.cloud : Use BigQuery to run a query in BigQuery
Code Explanation
There are some parts in here that are new. I will include highlighted text to indicate if it’s new code, like so [New Code]. Then the following text explanation and direct code block below contains new code. Otherwise, it’s the same code as before and you can skip it, if you’ve already read my previous article. If you haven’t then this entire section contains all the explanation you need to know.
First I set up the environment with the necessary Python libraries.
import os
import pandas as pd
from google.cloud import bigquery
Then I initialize the BigQuery client.
# Initialize BigQuery client to interact with the database
client = bigquery.Client()
[New Code]: I get the current date and time and set it to today . Then I set the variables start_date and end_date . With start_date to 7 days ago from today (since I’m automatically running this every Monday it will do the correction calculation) and then get the end_date as the start_date plus 6 days. This lets me get last week’s data from Monday to Sunday. Then I get the week number based on the end_date time (to match with how I previously calculated the week numbers). For example if I run this on Monday June 10, 2024. start_date is ‘2024-06-03’ and end_date is ‘2024-06-09’. Which is how our weekly reports run.
# Get the current date
today = datetime.today()
# Calculate start of the previous week (Monday)
start_date = today - timedelta(days=today.weekday() + 7)
# Calculate end of the previous week (Sunday)
end_date = start_date + timedelta(days=6)
# Calculate the week number for the previous week
week_num = int(end_date.strftime('%U'))
[New Code]: I use a try block to format the start_date and end_date into the the format I want for the CSV file and folder. In this case I turn the start_date and end_date into strings first. Then I do the formatting I did previously. Which turns 2024-05-13 to 5-13-24.
# Format dates to more readable form for folder creation
try:
# Format start_date and end_date to strings for formatting
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')
# Parse start_date and end_date into the desired format
start_date_components = start_date_str.split('-')
start_date_formatted = f"{int(start_date_components[1]):g}-{int(start_date_components[2]):g}-{start_date_components[0][-2:]}"
end_date_components = end_date_str.split('-')
end_date_formatted = f"{int(end_date_components[1]):g}-{int(end_date_components[2]):g}-{end_date_components[0][-2:]}"
Then I use these variables to create a new folder in my computer. I also define the folder path. I also have an if statement to check if this main folder exists, if it doesn’t then create it.
The main folder would be formatted as: 20 5-13-24 to 5-19-24. The script would print out:
Folder created at: computer\file\path.
folder_name = f"{week_num} {start_date_formatted} to {end_date_formatted}"
# Define path for main report folder and ensure it's created
main_folder_path = os.path.join(r'computer\file\path', folder_name)
if not os.path.exists(main_folder_path):
os.makedirs(main_folder_path)
print(f"Folder created at: {main_folder_path}")
Finally, for this try block I also have except to handle possible errors and return the issues.
except ValueError as ve:
print(f"Error processing date formats: {ve}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
Then we move onto the actual query. This is getting customer transactions and company expenses between the dates I provided above. This is in start_date using @start and end_date using @end (more on why I’m using this @ later). As a reminder the actual query I use for this report is more complicated and involved. But for demonstration purposes this is a simpler query.
# Define SQL query using parameters for flexibility and security
query = """
WITH transactions AS (
SELECT
DATE(timestamp, 'America/New_York') as purchase_date,
SUM(amount_usd) as total_purchases
FROM
`project-id.database.transactions` t
WHERE
DATE(timestamp, 'America/New_York') BETWEEN @start AND @end
GROUP BY
purchase_date
),
expenses as(
SELECT
DATE(timestamp, 'America/New_York') as expense_date,
SUM(expenses) as total_expenses
FROM
`project-id.database.expenses` t
WHERE
DATE(timestamp, 'America/New_York') BETWEEN @start AND @end
GROUP BY expense_date
)
SELECT
day,
FORMAT_DATE('%a', day) as day_of_the_week,
t.total_purchases,
e.total_expenses
FROM
UNNEST(GENERATE_DATE_ARRAY(DATE(@end), DATE(@start), INTERVAL -1 DAY)) AS day
LEFT JOIN transactions t ON t.purchase_date = day
LEFT JOIN expenses e ON e.expense_date = day
WHERE
day BETWEEN @start AND @end
ORDER BY
day
"""
[New Code]: Then I set up the query parameters to actually use the start_date_str and end_date_str instead of start_date and end_date . Since start_date and end_date is formatted in datetime format it would not work. These are used as strings and label them as start and end respectively.
# Configure query parameters to safeguard against SQL injection and provide flexibility
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("start", "STRING", start_date_str),
bigquery.ScalarQueryParameter("end", "STRING", end_date_str)
]
)
If I ran this query the results (showing only the first 5 rows) would be:
day | day_of_the_week | total_purchases | expenses |
2024-05-13 | mon | 10000 | 5000 |
2024-05-14 | tue | 15000 | 2000 |
2024-05-15 | wed | 10000 | 3000 |
2024-05-16 | thu | 15000 | 4000 |
2024-05-17 | fri | 20000 | 5000 |
In a try block I execute the query and load the query results into a pandas DataFrame called df .
try:
# Execute the query and load results into a pandas DataFrame
query_job = client.query(query, job_config=job_config)
df = query_job.to_dataframe()
I create and apply a function to create a new column with a calculation. In this example it’s to calculate the revenue which is total_purchases (income) - total_expenses . It sets total_purchases to the total_purchases row and total_expenses to the total_expenses row. Note: In theory you can do this in your SQL query. But the reason why I don’t in my actual report is because the final calculation is dependent on other calculations I do in the query. But for demonstration purposes I’ve kept it simple.
Anyways I’ve also added an if statement to check if there’s no data for total_purchases or total_expenses , it returns None. The function returns total_purchases - total_expenses. And I apply that function to my DataFrame by creating a new column called revenue .
# Function to calculate net revenue from purchases and expenses
def calculate_revenue(row):
total_purchases = row['total_purchases']
total_expenses = row['total_expenses']
if total_purchases is None or total_expenses is None:
return None
return total_purchases - total_expenses
# Apply the custom function to create a new column
df['revenue'] = df.apply(calculate_revenue, axis=1)
Then I print this DataFrame.
# Print DataFrame
print("Query results:")
print(df)
This would print out:
Query Results:
day | day_of_the_week | total_purchases | expenses | revenue |
2024-05-13 | mon | 10000 | 5000 | 5000 |
2024-05-14 | tue | 15000 | 2000 | 13000 |
2024-05-15 | wed | 10000 | 3000 | 7000 |
2024-05-16 | thu | 15000 | 4000 | 11000 |
2024-05-17 | fri | 20000 | 5000 | 15000 |
2024-05-18 | sat | 15000 | 4000 | 11000 |
2024-05-19 | sun | 10000 | 3000 | 7000 |
I also create a new folder to have this CSV file within the main folder I created above. I have an if statement to check if the CSV subfolder exists or not. Then I give the file name for the CSV file and I set the file path for the CSV file using the csv_folder_path and csv_file_name.
# Setup paths and export results to CSV
csv_folder_path = os.path.join(main_folder_path, 'CSV')
# Create the CSV subfolder if it doesn't exist
if not os.path.exists(csv_folder_path):
os.makedirs(csv_folder_path)
# Define the filename for the CSV file
csv_file_name = f'Report Tables {start_date_formatted} to {end_date_formatted}.csv'
# Combine the CSV folder path and filename for the CSV file
csv_file_path = os.path.join(csv_folder_path, csv_file_name)
I export the DataFrame as a CSV file into that CSV subfolder. I also print information about the CSV file and where it was saved.
This would print out:
CSV file: Report Tables 5-13-24 to 5-19-24.csv exported successfully to: computer\file\path
# Export DataFrame to CSV file
df.to_csv(csv_file_path, index=False)
print(f"CSV file: {csv_file_name} exported successfully to: {csv_file_path}")
Finally, I have an except block to handle any potential errors.
except Exception as e:
print("An error occurred during query execution:", e)
Final Code
Below is the final code. View the final code in Github: weekly_sales_report_automatic.py . Note: the code snippets above have more comments and tables (for explanation purposes) than the final code in my Github repo.
import os
import pandas as pd
from google.cloud import bigquery
# Initialize BigQuery client to interact with the database
client = bigquery.Client()
# Get the current date
today = datetime.today()
# Calculate start of the previous week (Monday)
start_date = today - timedelta(days=today.weekday() + 7)
# Calculate end of the previous week (Sunday)
end_date = start_date + timedelta(days=6)
# Calculate the week number for the previous week
week_num = int(end_date.strftime('%U'))
# Format dates to more readable form for folder creation
try:
# Format start_date and end_date to strings for formatting
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')
# Parse start_date and end_date into the desired format
start_date_components = start_date_str.split('-')
start_date_formatted = f"{int(start_date_components[1]):g}-{int(start_date_components[2]):g}-{start_date_components[0][-2:]}"
end_date_components = end_date_str.split('-')
end_date_formatted = f"{int(end_date_components[1]):g}-{int(end_date_components[2]):g}-{end_date_components[0][-2:]}"
folder_name = f"{week_num} {start_date_formatted} to {end_date_formatted}"
# Define path for main report folder and ensure it's created
main_folder_path = os.path.join(r'computer\file\path', folder_name)
if
not os.path.exists(main_folder_path):
os.makedirs(main_folder_path)
print(f"Folder created at: {main_folder_path}")
except ValueError as ve:
print(f"Error processing date formats: {ve}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Define SQL query using parameters for flexibility and security
query = """
WITH transactions AS (
SELECT
DATE(timestamp, 'America/New_York') as purchase_date,
SUM(amount_usd) as total_purchases
FROM
`project-id.database.transactions` t
WHERE
DATE(timestamp, 'America/New_York') BETWEEN @start AND @end
GROUP BY
purchase_date
),
expenses as(
SELECT
DATE(timestamp, 'America/New_York') as expense_date,
SUM(expenses) as total_expenses
FROM
`project-id.database.expenses` t
WHERE
DATE(timestamp, 'America/New_York') BETWEEN @start AND @end
GROUP BY
expense_date
)
SELECT
day,
FORMAT_DATE('%a', day) as day_of_the_week,
t.total_purchases,
e.total_expenses
FROM
UNNEST(GENERATE_DATE_ARRAY(DATE(@end), DATE(@start), INTERVAL -1 DAY)) AS day
LEFT JOIN transactions t ON t.purchase_date = day
LEFT JOIN expenses e ON e.expense_date = day
WHERE
day BETWEEN @start AND @end
ORDER BY
day"""
# Configure query parameters to safeguard against SQL injection and provide flexibility
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("start", "STRING", start_date_str),
bigquery.ScalarQueryParameter("end", "STRING", end_date_str)
]
)
try:
# Execute the query and load results into a pandas DataFrame
query_job = client.query(query, job_config=job_config)
df = query_job.to_dataframe()
# Function to calculate net revenue from purchases and expenses
def calculate_revenue(row):
total_purchases = row['total_purchases']
total_expenses = row['total_expenses']
if total_purchases is None or total_expenses is None:
return None
return total_purchases - total_expenses
# Apply the custom function to create a new column
df['revenue'] = df.apply(calculate_revenue, axis=1)
print("Query results:")
print(df)
# Setup paths and export results to CSV for further analysis and reporting
csv_folder_path = os.path.join(main_folder_path, 'CSV')
if not os.path.exists(csv_folder_path):
os.makedirs(csv_folder_path)
csv_file_name = f'Report Tables {start_date_formatted} to {end_date_formatted}.csv'
csv_file_path = os.path.join(csv_folder_path, csv_file_name)
df.to_csv(csv_file_path, index=False)
print(f"CSV file: {csv_file_name} exported successfully to: {csv_file_path}")
except Exception as e:
print("An error occurred during query execution:", e)
Step 2: Create batch file
Named: run_weekly_report_script.bat.
This batch file includes all the necessary commands to set up your Python environment and run your Python script.
The first command is used in batch files to stop the command prompt from displaying the commands as they are executed. This makes the output cleaner and less cluttered.
@echo off
Then the line calls the activate.bat batch script, which is part of the Anaconda installation. The script sets up the environment variables necessary for Anaconda to run. This line specifically ensures that the root Anaconda environment is activated, which allows you to use Anaconda's Python and other tools.
CALL C:\Users\user_name\anaconda3\Scripts\activate.bat C:\Users\user_name\anaconda3
This command activates a conda virtual environment named venv1. Virtual environments are used to manage separate package installations for different projects. By activating venv1, you ensure that the script runs with the specific packages and Python version configured for this environment.
CALL conda activate venv1
The first part of this runs a Python script located at C:\Users\user_name\my_file_path\weekly_sales_report_automatic.py using Python from the previously activated Anaconda environment. The > redirects the standard output (stdout) to a file named output.log located in C:\Users\user_name\my_file_path\Logging. This means all output that would normally be printed to the terminal will instead be saved in this log file. 2>&1 redirects the standard error (stderr, which is denoted by 2) to the same location as stdout (1), which means both normal output and error messages will be captured in the output.log file.
To be honest I got a lot of the syntax here from ChatGPT which recommended having a log file.
python C:\Users\user_name\my_file_path\weekly_sales_report_automatic.py > C:\Users\user_name\my_file_path\Logging\output.log 2>&1
Final code:
@echo offCALL C:\Users\user_name\anaconda3\Scripts\activate.bat
C:\Users\user_name\anaconda3CALL conda activate venv1python
C:\Users\user_name\my_file_path\weekly_sales_report_automatic.py > C:\Users\user_name\my_file_path\Logging\output.log 2>&1
Since this Python script is run automatically the print statements that are usually printed out in the console will be in the output.log file. It will update every time this script is run. So in this example the output.log would have the following:
Folder created at: computer\file\path.
Query results:
day day_of_the_week total_purchases expenses revenue2024-05-13 mon 10000 5000 5000 2024-05-14
tue 15000 2000 13000 2024-05-15
wed 10000 3000 7000 2024-05-16
thu 15000 4000 11000 2024-05-17
fri 20000 5000 15000 2024-05-18
sat 15000 4000 11000 2024-05-19
sun 10000 3000 7000
CSV file: Report Tables 5-13-24 to 5-19-24.csv exported successfully to: computer\file\path
Step 3: Scheduled Python Script
I used Windows Task Scheduler to automatically schedule my Python script to run every Monday at a certain time.
Open Task Scheduler
In the Task Scheduler library, right-click and select "Create Task..." to open the task creation dialog.
General Tab:
Give my task a name, e.g., "Run Management Analysis".
Triggers Tab:
Click "New..." to set up a trigger for the task. This defines when the task should run (e.g., at system startup, on a schedule, etc.).
I used it to run weekly on Monday's at a specific time.
Actions Tab:
Click "New..." to add an action.
Set "Action" to "Start a program".
In "Program/script", browse to or enter the path to the run_weekly_report_script.bat file (I didn't need to add any arguments because my batch file didn't specifically require them)
Conditions and Settings Tabs:
Adjust these settings based on any specific conditions I wanted.
Finish and Test:
Click "OK" to save the task.
Right-click my task and select "Run" to test it immediately to ensure it works as expected.
Reasoning
I wanted to make a few notes with my reasoning:
I used Task Scheduler because it was the easiest to get started with.
I used these specific libraries because I’m familiar with them, they’re easy to use, and widely supported.
I saved the file in a CSV sub-folder because I save this as an Excel file in the main folder. The the reason why I don't save the DataFrame as an Excel file is the date formatting doesn’t turn out correct this way. I have to also create some charts in Excel for an executive report every week.
Again the actual query and calculations in my script are more complicated. But the purpose of this blog post is to go over the concepts so it’s been changed to focus on learning (and for data privacy reasons).
Future Features
A few things I would want for the future:
Figure out a way to export this to an Excel sheet (with the dates fixed).
Use this Excel sheet to update another Excel sheet automatically.
Save the charts and graphs that’s created to the main folder.
What I Learned
Honestly, was a lot of fun diving more into Python and automation specifically. I learned more about date manipulation in Python and how to run programs on a regular schedule using Windows Task Scheduler.
Conclusion
Overall my first attempt to automate this saved a lot of time and it was a great learning opportunity. But with automatically scheduling and updating the dates it has saved me even more time and gives me an idea of what else I can do to automate. Hopefully this helps give you ideas on what you can automate using Python. This is only the start of of my journey using SQL & Python together, can’t wait to see what more I can do!