Connect SFTP to BigQuery
Set up the connection from SFTP to BigQuery with code or no code!
Text and video instructions below.
UUSIMMAT JULKAISUT
Google Analytics 4 -raportointi | Ilmainen koulutus
Google Analytics 4 -raportointi Koulutuksen jälkeen ymmärrät mm. 🔹 raportoinnin Google Analytics 4:llä 🔹 Kuinka visualisoit ja raportoit mm. sivustosi merkittäviä mittareita 👉🏻 ja kuinka raportoinnin avulla voit tukea liiketoimintasi tavoitteita ja niiden mitattavuutta! Video: Google Analytics 4 -raportointi Katso koulutus ilmaiseksi alla olevasta videosta. Koulutusaiheemme liittyvät moderniin digitaaliseen myyntiin ja markkinointiin. Google Analytics 4,…
Markkinoinnin mittarit ja raportointi | Katso koulutus!
Digimarkkinoinnin mittarit ja raportointi: koulutuksen esittely Katso ilmainen koulutus alla olevasta videosta ja opi digitaalisen markkinoinnin mittarit ja raportointi! Koulutuksen jälkeen ymmärrät mm. 🔹 Markkinoinnin mittarit 🔹 Kuinka visualisoit ja raportoit markkinoinnin ROI:ta ja asiakashankinnan hintaa 🔹 Kuinka markkinointisi suoriutuu 👉🏻 ja kuinka näillä tiedoilla lisätään myyntiä. Video: Digimarkkinoinnin mittarit ja raportointi Katso koulutus ilmaiseksi…
GA4 ja Looker Studio | Ilmainen koulutus
Video: Google Analytics 4 ja Looker Studio Katso koulutus ilmaiseksi alla olevasta videosta. https://www.youtube.com/watch?v=ILOj9v9TdQc GA4 & Looker Studio – digitaalisen liiketoimintasi mitattavuus: koulutuksen esittely Tunne verkkoliiketoimintasi Google Analytics 4:n ja Looker Studion avulla! Koulutuksen jälkeen ymmärrät mm.🔹 Kuinka visualisoit ja raportoit Google Analytics -dataa🔹 Mikä todella tuo verkkoliiketoiminnassasi konversioita ja myyntiä🔹 Webisi käyttökokemusta👉🏻 ja kuinka…
Automating CSV transfers from SFTP to BigQuery: Quick guide
In this article, we’ll show you how to automate the process of transferring data from SFTP to BigQuery using Google Cloud toolsand services. The steps include setting up a service account, securely managing SFTP credentials with Secret Manager, deploying a Cloud Function, and scheduling it for automated execution, followed by a scheduled query in BigQuery. By the end of this tutorial, you’ll have a streamlined system that automates the transfer and ingestion of data from SFTP to Google BigQuery. If you wish to avoid coding, Netice Data Transfer Platform provides a secure and low-cost no-code solution with a free trial of 30 days, or alternatively we can create and provide you with your own custom solution!
Prefer coding?
Code your own solution:
1. Schedule the file transfer from SFTP to Google Cloud Storage with Google Cloud Functions.
2. Load the CSV file from Cloud Storage into a Google BigQuery table.
No-Code Solution
30 Days Free Trial
Effortless, secure data transfers between SFTP and Google BigQuery — no coding required!
- No credit card needed
- Instant setup in minutes
If you prefer to watch the guide as video:
The video instructions show the first leg of the automation from SFTP to Google Cloud Storage, and further instructions from SFTP to BigQuery are found below in the text guide.
Prior to BigQuery transfer, we start by scheduling the file transfer from SFTP to Google Cloud Storage with Google Cloud Run Functions:
Step 1: Create the Service Account with proper privileges in Google Cloud
First, navigate to Google Cloud IAM section and select “Service Accounts”. Click “Create Service Account” and name it e.g. sftp-google-cloud.
Assign the following roles to handle the transfer independently:
- Storage Admin
- BigQuery Admin
- Secret Manager Secret Accessor
- Cloud Functions Invoker
- Cloud Run Invoker (necessary for second-generation Cloud Functions)
To transfer data from SFTP to BigQuery, the first step is to get the data to a Cloud Storage bucket:
Step 2: Create a Cloud Storage Bucket that will receive the data files from SFTP
Navigate to Cloud Storage and set up a new bucket. Choose your preferred region and make sure to activate the “Enforce public access prevention” option to enhance security. This bucket will be used to temporarily host the files coming from the SFTP server.
Step 3: Set Up Google Cloud Secret Manager to manage your SFTP credentials
Go to the Security section in Google Cloud and open Secret Manager. Activate the Secret Manager API, then create a new secret called “SFTP_KEY.” Insert your SFTP private key into the secret value field to securely store it, allowing the Cloud Run Function which we will next create, to retrieve it when needed. If you use password authentication, then replace it with password and in the next step, remember to modify the Google Cloud Run Function accordingly.
Step 4: Create the Google Cloud Run Function that transfers data files from SFTP to Google Cloud Storage
Main.py: Click here to proceed to the code.
Head to Cloud Run Functions and create a new function. Choose HTTPS as the trigger type and enable authentication. Depending on the load of your data file transfer(s), you should usually set the memory to either 256 MB or 512 MB. Set the timeout to 540 seconds, which is the maximum in Cloud Run Functions. Select your service account we created in Step 1 from the available options. Use Python 3.12 for the runtime and set the entry point to “process_request”. Upload the required files (main.py, requirements.txt, and .env) to complete the setup.
requirements.txt: python-dotenv paramiko pandas google-cloud-storage google-cloud-secret-manager flask>=1.0,<3.0 .env: SFTP_SERVER=*Your SFTP server host* SFTP_USER=*Your SFTP username* SFTP_KEY_SECRET=SFTP_KEY GCS_BUCKET_NAME=*Your GCS bucket name (created in Step 2)* BASE_DIRECTORY=/ (if root) GOOGLE_CLOUD_PROJECT=*Your Google Cloud Project ID*
Step 5: Deploy the Cloud Function
Deploy the Cloud Run Function.
Step 6: Schedule the Cloud Run Function with Google Cloud Scheduler
Activate the Cloud Scheduler API and create a new scheduled job. Set it to run daily at your preferred time. Choose HTTP as the target type, input the URL from your Cloud Run Function’s “Trigger” tab, and configure the authorization header with an OIDC token. This scheduler will automate the data transfer process from SFTP to Google Cloud Storage which is the prerequisite to have it written to Google BigQuery, which we will do in Step 8.
Step 7: Test the SFTP to GCS Cloud Run Function
Force a run to test the SFTP to Google Cloud Storage transfer. Check that the run does not fail and the file(s) are successfully transferred to your Cloud Storage bucket.
Now, let's load the CSV file from Cloud Storage into a Google BigQuery table.
Step 8: Create a scheduled query in BigQuery for transferring the CSV file(s) from the specified GCS bucket to BigQuery
Now, all that is left to do is to automate the data movement from your Google Cloud Storage Bucket to BigQuery. Go to BigQuery and create the scheduled query:
CREATE OR REPLACE EXTERNAL TABLE `your-project.your_dataset.external_table` OPTIONS ( format = 'CSV', uris = ['gs://your-bucket-name/path-to-file.csv'] );
All done with SFTP to BigQuery!
By following these steps, you’ve automated the process of transferring data from an SFTP to BigQuery. The data now flows from your SFTP server to Google Cloud Storage using Google Cloud Run Functions, Secret Manager, and Cloud Scheduler, and is further transferred to Google BigQuery with a scheduled query.
Free trial for 30 days - instant access
Set up your SFTP & Google Cloud connection in minutes - no credit card needed
Your free 30-day trial includes:
- Full access: Explore all features of Netice and automate your data transfers between SFTP, Google Cloud Storage and BigQuery
- Effortless Setup: No coding required—configure your integration in minutes.
- Security and Compliance: Netice is built with enterprise-grade security and GDPR compliance.
- No Credit Card Required: No credit card needed to get started, no hidden fees.
- Unlimited Support: Benefit from our dedicated support team to guide you through setting up your automated data transfers by request.
- No Purchase Commitment: You are not billed at any point before you make the decision to purchase.
Create your account
SFTP & Google Cloud Storage
SFTP & Google BigQuery
Need your custom solution?
We can connect SFTP to BigQuery for you.
Receive the SFTP to BigQuery code to your email!
SFTP to Google Cloud Storage - code:
Cloud functions - main.py
import os
import paramiko
import io
import pandas as pd
import stat
from google.cloud import storage, secretmanager
from flask import jsonify
from datetime import datetime
from dotenv import load_dotenv
import logging
# Load environment variables from .env file
load_dotenv()
# Setup logging
logging.basicConfig(level=logging.DEBUG)
def get_secret(secret_id):
logging.debug(f”Retrieving secret for secret_id: {secret_id}”)
try:
client = secretmanager.SecretManagerServiceClient()
project_id = os.getenv(“GOOGLE_CLOUD_PROJECT”)
logging.debug(f”Project ID: {project_id}”)
secret_name = f”projects/{project_id}/secrets/{secret_id}/versions/latest”
logging.debug(f”Secret name: {secret_name}”)
response = client.access_secret_version(request={“name”: secret_name})
logging.debug(f”Secret response received: {response}”)
secret_value = response.payload.data
logging.debug(f”Secret value (raw): {secret_value}”)
if isinstance(secret_value, bytes):
secret_value = secret_value.decode(‘utf-8’)
logging.debug(f”Secret value decoded: {secret_value}”)
return secret_value
except Exception as e:
logging.error(f”Error retrieving secret {secret_id}: {e}”)
return None
def normalize_private_key(private_key):
logging.debug(“Normalizing private key”)
lines = private_key.strip().split(“\\n”)
normalized_key = “\n”.join(lines)
logging.debug(f”Normalized private key: {normalized_key}”)
return normalized_key
def create_sftp_connection(host, username, private_key_pem):
logging.debug(f”Creating SFTP connection to host: {host} with username: {username}”)
try:
ssh_client = paramiko.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if private_key_pem is None:
raise ValueError(“The private key is None. Check the secret retrieval.”)
logging.debug(f”Private key PEM type: {type(private_key_pem)}”)
private_key_pem = normalize_private_key(private_key_pem)
if isinstance(private_key_pem, str):
private_key_bytes = private_key_pem.encode(‘utf-8’)
else:
private_key_bytes = private_key_pem
logging.debug(f”Keyfile type: {type(private_key_bytes)}, length: {len(private_key_bytes)}”)
private_key_file = io.StringIO(private_key_pem)
private_key = paramiko.RSAKey.from_private_key(private_key_file)
logging.debug(“Private key loaded successfully”)
logging.debug(f”Attempting to connect to SFTP server {host} as {username}”)
ssh_client.connect(host, username=username, pkey=private_key, banner_timeout=200)
sftp_client = ssh_client.open_sftp()
logging.debug(‘SFTP Connection Successful’)
return sftp_client, ssh_client
except paramiko.ssh_exception.SSHException as e:
logging.error(f”SSHException creating SFTP connection: {e}”)
except Exception as e:
logging.error(f”Error creating SFTP connection: {e}”)
return None, None
def retrieve_sftp_files(sftp_client, base_directory, relative_path=”, file_details=None):
logging.debug(f”Retrieving SFTP files from base_directory: {base_directory}, relative_path: {relative_path}”)
if file_details is None:
file_details = []
current_directory = os.path.join(base_directory, relative_path)
logging.debug(f”Current directory: {current_directory}”)
try:
for entry in sftp_client.listdir_attr(current_directory):
logging.debug(f”Entry: {entry.filename}, is directory: {stat.S_ISDIR(entry.st_mode)}”)
if stat.S_ISDIR(entry.st_mode): # if folder
new_path = os.path.join(relative_path, entry.filename)
retrieve_sftp_files(sftp_client, base_directory, new_path, file_details)
else: # if file
filename = entry.filename
filepath = os.path.join(relative_path, filename)
last_modified = datetime.fromtimestamp(entry.st_mtime)
days_since_modified = (datetime.now() – last_modified).days
file_details.append({‘filename’: filename, ‘filepath’: filepath, ‘days_since_modified’: days_since_modified})
logging.debug(f”File: {filename}, Path: {filepath}, Days since modified: {days_since_modified}”)
logging.info(f”Retrieved {len(file_details)} file details.”)
except Exception as e:
logging.error(f”Error retrieving SFTP files: {e}”)
return file_details
def fetch_file(sftp_client, remote_filepath, local_filepath):
logging.debug(f”Fetching file from remote_filepath: {remote_filepath} to local_filepath: {local_filepath}”)
try:
sftp_client.get(remote_filepath, local_filepath)
logging.info(f”Fetched file from {remote_filepath} to {local_filepath}.”)
except Exception as e:
logging.error(f”Error fetching file {remote_filepath}: {e}”)
def transfer_to_gcs(bucket_name, local_filepath, remote_blob_name):
logging.debug(f”Transferring file to GCS bucket: {bucket_name}, local_filepath: {local_filepath}, remote_blob_name: {remote_blob_name}”)
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(remote_blob_name)
blob.upload_from_filename(local_filepath)
logging.info(f”Transferred file {local_filepath} to GCS bucket {bucket_name} as {remote_blob_name}.”)
except Exception as e:
logging.error(f”Error transferring file to GCS: {e}”)
def identify_new_files(sftp_client, base_directory, days_threshold):
logging.debug(f”Identifying new files in base_directory: {base_directory} with days_threshold: {days_threshold}”)
try:
sftp_files = retrieve_sftp_files(sftp_client=sftp_client, base_directory=base_directory, relative_path=”, file_details=None)
sftp_files_df = pd.DataFrame(sftp_files)
logging.debug(f”SFTP files dataframe: {sftp_files_df}”)
if days_threshold:
sftp_files_df = sftp_files_df[sftp_files_df[‘days_since_modified’] <= days_threshold]
bucket_name = os.getenv(“GCS_BUCKET_NAME”)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blobs = bucket.list_blobs()
gcs_files = [(blob.name.split(‘/’)[-1], blob.name) for blob in blobs]
gcs_files_df = pd.DataFrame(gcs_files, columns=[‘gcs_filename’, ‘gcs_filepath’])
logging.debug(f”GCS files dataframe: {gcs_files_df}”)
merged_df = sftp_files_df.merge(gcs_files_df, how=’left’, left_on=[‘filepath’], right_on=[‘gcs_filepath’])
merged_df = merged_df[merged_df[‘gcs_filepath’].isna()]
merged_df.sort_values(by=’days_since_modified’, inplace=True, ascending=True)
logging.info(f”Identified {len(merged_df)} new files to be processed.”)
return merged_df[[‘filename’, ‘filepath’]]
except Exception as e:
logging.error(f”Error identifying new files: {e}”)
return pd.DataFrame(columns=[‘filename’, ‘filepath’])
def process_request(request):
logging.debug(“Processing request”)
try:
base_directory = os.getenv(“BASE_DIRECTORY”)
logging.debug(f”Base directory: {base_directory}”)
sftp_host = os.getenv(“SFTP_SERVER”)
sftp_user = os.getenv(“SFTP_USER”)
sftp_private_key_secret_name = os.getenv(“SFTP_KEY_SECRET”)
logging.debug(f”SFTP host: {sftp_host}, SFTP user: {sftp_user}, SFTP key secret name: {sftp_private_key_secret_name}”)
sftp_private_key = get_secret(sftp_private_key_secret_name)
if sftp_private_key is None:
logging.error(“Failed to retrieve the SFTP private key”)
return jsonify({‘error’: ‘Failed to retrieve the SFTP private key’}), 500
gcs_bucket = os.getenv(“GCS_BUCKET_NAME”)
logging.debug(f”GCS bucket: {gcs_bucket}”)
request_json = request.get_json(silent=True)
request_args = request.args
if request_json and ‘days_threshold’ in request_json:
days_threshold = request_json[‘days_threshold’]
elif request_args and ‘days_threshold’ in request_args:
days_threshold = request_args[‘days_threshold’]
else:
logging.warning(‘Days Parameter Error, Selecting Default (1)’)
days_threshold = 1
days_threshold = int(days_threshold)
logging.debug(f”Processing with days_threshold: {days_threshold}”)
sftp_client, ssh_client = create_sftp_connection(host=sftp_host, username=sftp_user, private_key_pem=sftp_private_key)
if sftp_client is None or ssh_client is None:
logging.error(“Failed to create SFTP connection”)
return jsonify({‘error’: ‘Failed to create SFTP connection’}), 500
new_files_df = identify_new_files(sftp_client, base_directory, days_threshold)
logging.info(f”{len(new_files_df)} new files are ready to download.”)
for filename, filepath in new_files_df[[‘filename’, ‘filepath’]].values:
fetch_file(sftp_client, base_directory + “/” + filepath, ‘/tmp/’ + filename)
transfer_to_gcs(gcs_bucket, ‘/tmp/’ + filename, filename) # Ensure the blob name is appropriate for your use case
sftp_client.close()
ssh_client.close()
return jsonify({‘message’: ‘Process completed successfully’, ‘files’: new_files_df.to_dict(orient=’records’)})
except Exception as e:
logging.error(f”Error in processing request: {e}”)
return jsonify({‘error’: str(e)}), 500
Cloud functions - .env
SFTP_SERVER=your sftp server
SFTP_USER=your sftp user
SFTP_KEY_SECRET=SFTP_KEY
GCS_BUCKET_NAME=your bucket name
BASE_DIRECTORY=/
GOOGLE_CLOUD_PROJECT=your project
Cloud functions - requirements.txt
python-dotenv
paramiko
pandas
google-cloud-storage
google-cloud-secret-manager
flask>=1.0,<3.0