Connect SFTP to Google Cloud Storage
Setup in 15 minutes with code or no code!
Video instructions below.
UUSIMMAT JULKAISUT
Google Analytics 4 -raportointi | Ilmainen koulutus
Google Analytics 4 -raportointi Koulutuksen jälkeen ymmärrät mm. 🔹 raportoinnin Google Analytics 4:llä 🔹 Kuinka visualisoit ja raportoit mm. sivustosi merkittäviä mittareita 👉🏻 ja kuinka raportoinnin avulla voit tukea liiketoimintasi tavoitteita ja niiden mitattavuutta! Video: Google Analytics 4 -raportointi Katso koulutus ilmaiseksi alla olevasta videosta. Koulutusaiheemme liittyvät moderniin digitaaliseen myyntiin ja markkinointiin. Google Analytics 4,…
Markkinoinnin mittarit ja raportointi | Katso koulutus!
Digimarkkinoinnin mittarit ja raportointi: koulutuksen esittely Katso ilmainen koulutus alla olevasta videosta ja opi digitaalisen markkinoinnin mittarit ja raportointi! Koulutuksen jälkeen ymmärrät mm. 🔹 Markkinoinnin mittarit 🔹 Kuinka visualisoit ja raportoit markkinoinnin ROI:ta ja asiakashankinnan hintaa 🔹 Kuinka markkinointisi suoriutuu 👉🏻 ja kuinka näillä tiedoilla lisätään myyntiä. Video: Digimarkkinoinnin mittarit ja raportointi Katso koulutus ilmaiseksi…
GA4 ja Looker Studio | Ilmainen koulutus
Video: Google Analytics 4 ja Looker Studio Katso koulutus ilmaiseksi alla olevasta videosta. https://www.youtube.com/watch?v=ILOj9v9TdQc GA4 & Looker Studio – digitaalisen liiketoimintasi mitattavuus: koulutuksen esittely Tunne verkkoliiketoimintasi Google Analytics 4:n ja Looker Studion avulla! Koulutuksen jälkeen ymmärrät mm.🔹 Kuinka visualisoit ja raportoit Google Analytics -dataa🔹 Mikä todella tuo verkkoliiketoiminnassasi konversioita ja myyntiä🔹 Webisi käyttökokemusta👉🏻 ja kuinka…
Automating SFTP to Google Cloud Storage Transfers: A Comprehensive Guide
In this blog post, we’ll guide you through automating the transfer of files from an SFTP server to Google Cloud Storage using Google Cloud services. This process involves setting up a service account, creating a Cloud Storage bucket, securely storing SFTP credentials in Secret Manager, deploying a Cloud Function, and scheduling the function to run automatically. By the end of this guide, you’ll have a fully automated system for transferring files from an SFTP server to Google Cloud Storage.
Automating SFTP to Google Cloud Storage Transfers: A Comprehensive Guide
In this blog post, we’ll guide you through automating the transfer of files from an SFTP server to Google Cloud Storage using Google Cloud services. This process involves setting up a service account, creating a Cloud Storage bucket, securely storing SFTP credentials in Secret Manager, deploying a Cloud Function, and scheduling the function to run automatically. By the end of this guide, you’ll have a fully automated system for transferring files from an SFTP server to Google Cloud Storage.
Click here to proceed to the code.
Step 1: Create the Service Account
First, navigate to the IAM section and select “Service Accounts.” Click “Create Service Account” and name it “sftp-google-cloud.” Assign the following roles to handle the transfer independently:
- Storage Admin
- BigQuery Admin (optional, for further transfer to BigQuery)
- Secret Manager Secret Accessor
- Cloud Functions Invoker
- Cloud Run Invoker (necessary for second-generation Cloud Functions)
Step 2: Create a Cloud Storage Bucket to retrieve files from SFTP
Go to Cloud Storage and create a new bucket. Select your region and ensure “Enforce public access prevention on this bucket” is enabled for security. This bucket will store the files transferred from the SFTP server.
Step 3: Set Up Secret Manager
Navigate to Security and select Secret Manager. Enable the Secret Manager API and create a new secret named “SFTP_KEY.” Paste your SFTP private key into the secret value field to securely store it for the Cloud Function to access.
Step 4: Create the Cloud Function that transfers from SFTP to Google Cloud Storage
Click here to proceed to the code.
Go to Cloud Functions and create a new function. Set the trigger type to HTTPS and require authentication. Allocate at least 512MB of memory and set a timeout of up to 540 seconds. Select your service account from the drop-down menu. Use Python 3.12 as the runtime and set the entry point to process_request
. Upload the necessary files (main.py
, requirements.txt
, and .env
) to the Cloud Function.
Step 5: Deploy the Cloud Function
Deploy the Cloud Function to handle the actual transfer of files from the SFTP server to Google Cloud Storage.
Step 6: Schedule the Function with Cloud Scheduler
Enable the Cloud Scheduler API and create a new scheduler job named “SFTP_Gcloud_scheduler.” Set it to trigger daily at a specified time. Configure the target type as HTTP, use the URL from your Cloud Function’s trigger tab, and set the auth header as OIDC token. This scheduler will automate the transfer process.
Step 7: Test the SFTP to Google Cloud Storage Setup
Activate the Cloud Scheduler job and force a run to test the SFTP to Google Cloud Storage transfer. Verify the files are successfully transferred to your Cloud Storage bucket.
By following these steps, you’ve automated the process of transferring data from an SFTP server to Google Cloud Storage using Google Cloud Functions, Secret Manager, and Cloud Scheduler.
30-day free trial of no-code data transfers!
- Easy, Secure & Automated Data Transfers
- No credit card needed!
SFTP to Google Cloud Storage - code:
Cloud functions - main.py
import os
import paramiko
import io
import pandas as pd
import stat
from google.cloud import storage, secretmanager
from flask import jsonify
from datetime import datetime
from dotenv import load_dotenv
import logging
# Load environment variables from .env file
load_dotenv()
# Setup logging
logging.basicConfig(level=logging.DEBUG)
def get_secret(secret_id):
logging.debug(f”Retrieving secret for secret_id: {secret_id}”)
try:
client = secretmanager.SecretManagerServiceClient()
project_id = os.getenv(“GOOGLE_CLOUD_PROJECT”)
logging.debug(f”Project ID: {project_id}”)
secret_name = f”projects/{project_id}/secrets/{secret_id}/versions/latest”
logging.debug(f”Secret name: {secret_name}”)
response = client.access_secret_version(request={“name”: secret_name})
logging.debug(f”Secret response received: {response}”)
secret_value = response.payload.data
logging.debug(f”Secret value (raw): {secret_value}”)
if isinstance(secret_value, bytes):
secret_value = secret_value.decode(‘utf-8’)
logging.debug(f”Secret value decoded: {secret_value}”)
return secret_value
except Exception as e:
logging.error(f”Error retrieving secret {secret_id}: {e}”)
return None
def normalize_private_key(private_key):
logging.debug(“Normalizing private key”)
lines = private_key.strip().split(“\\n”)
normalized_key = “\n”.join(lines)
logging.debug(f”Normalized private key: {normalized_key}”)
return normalized_key
def create_sftp_connection(host, username, private_key_pem):
logging.debug(f”Creating SFTP connection to host: {host} with username: {username}”)
try:
ssh_client = paramiko.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if private_key_pem is None:
raise ValueError(“The private key is None. Check the secret retrieval.”)
logging.debug(f”Private key PEM type: {type(private_key_pem)}”)
private_key_pem = normalize_private_key(private_key_pem)
if isinstance(private_key_pem, str):
private_key_bytes = private_key_pem.encode(‘utf-8’)
else:
private_key_bytes = private_key_pem
logging.debug(f”Keyfile type: {type(private_key_bytes)}, length: {len(private_key_bytes)}”)
private_key_file = io.StringIO(private_key_pem)
private_key = paramiko.RSAKey.from_private_key(private_key_file)
logging.debug(“Private key loaded successfully”)
logging.debug(f”Attempting to connect to SFTP server {host} as {username}”)
ssh_client.connect(host, username=username, pkey=private_key, banner_timeout=200)
sftp_client = ssh_client.open_sftp()
logging.debug(‘SFTP Connection Successful’)
return sftp_client, ssh_client
except paramiko.ssh_exception.SSHException as e:
logging.error(f”SSHException creating SFTP connection: {e}”)
except Exception as e:
logging.error(f”Error creating SFTP connection: {e}”)
return None, None
def retrieve_sftp_files(sftp_client, base_directory, relative_path=”, file_details=None):
logging.debug(f”Retrieving SFTP files from base_directory: {base_directory}, relative_path: {relative_path}”)
if file_details is None:
file_details = []
current_directory = os.path.join(base_directory, relative_path)
logging.debug(f”Current directory: {current_directory}”)
try:
for entry in sftp_client.listdir_attr(current_directory):
logging.debug(f”Entry: {entry.filename}, is directory: {stat.S_ISDIR(entry.st_mode)}”)
if stat.S_ISDIR(entry.st_mode): # if folder
new_path = os.path.join(relative_path, entry.filename)
retrieve_sftp_files(sftp_client, base_directory, new_path, file_details)
else: # if file
filename = entry.filename
filepath = os.path.join(relative_path, filename)
last_modified = datetime.fromtimestamp(entry.st_mtime)
days_since_modified = (datetime.now() – last_modified).days
file_details.append({‘filename’: filename, ‘filepath’: filepath, ‘days_since_modified’: days_since_modified})
logging.debug(f”File: {filename}, Path: {filepath}, Days since modified: {days_since_modified}”)
logging.info(f”Retrieved {len(file_details)} file details.”)
except Exception as e:
logging.error(f”Error retrieving SFTP files: {e}”)
return file_details
def fetch_file(sftp_client, remote_filepath, local_filepath):
logging.debug(f”Fetching file from remote_filepath: {remote_filepath} to local_filepath: {local_filepath}”)
try:
sftp_client.get(remote_filepath, local_filepath)
logging.info(f”Fetched file from {remote_filepath} to {local_filepath}.”)
except Exception as e:
logging.error(f”Error fetching file {remote_filepath}: {e}”)
def transfer_to_gcs(bucket_name, local_filepath, remote_blob_name):
logging.debug(f”Transferring file to GCS bucket: {bucket_name}, local_filepath: {local_filepath}, remote_blob_name: {remote_blob_name}”)
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(remote_blob_name)
blob.upload_from_filename(local_filepath)
logging.info(f”Transferred file {local_filepath} to GCS bucket {bucket_name} as {remote_blob_name}.”)
except Exception as e:
logging.error(f”Error transferring file to GCS: {e}”)
def identify_new_files(sftp_client, base_directory, days_threshold):
logging.debug(f”Identifying new files in base_directory: {base_directory} with days_threshold: {days_threshold}”)
try:
sftp_files = retrieve_sftp_files(sftp_client=sftp_client, base_directory=base_directory, relative_path=”, file_details=None)
sftp_files_df = pd.DataFrame(sftp_files)
logging.debug(f”SFTP files dataframe: {sftp_files_df}”)
if days_threshold:
sftp_files_df = sftp_files_df[sftp_files_df[‘days_since_modified’] <= days_threshold]
bucket_name = os.getenv(“GCS_BUCKET_NAME”)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blobs = bucket.list_blobs()
gcs_files = [(blob.name.split(‘/’)[-1], blob.name) for blob in blobs]
gcs_files_df = pd.DataFrame(gcs_files, columns=[‘gcs_filename’, ‘gcs_filepath’])
logging.debug(f”GCS files dataframe: {gcs_files_df}”)
merged_df = sftp_files_df.merge(gcs_files_df, how=’left’, left_on=[‘filepath’], right_on=[‘gcs_filepath’])
merged_df = merged_df[merged_df[‘gcs_filepath’].isna()]
merged_df.sort_values(by=’days_since_modified’, inplace=True, ascending=True)
logging.info(f”Identified {len(merged_df)} new files to be processed.”)
return merged_df[[‘filename’, ‘filepath’]]
except Exception as e:
logging.error(f”Error identifying new files: {e}”)
return pd.DataFrame(columns=[‘filename’, ‘filepath’])
def process_request(request):
logging.debug(“Processing request”)
try:
base_directory = os.getenv(“BASE_DIRECTORY”)
logging.debug(f”Base directory: {base_directory}”)
sftp_host = os.getenv(“SFTP_SERVER”)
sftp_user = os.getenv(“SFTP_USER”)
sftp_private_key_secret_name = os.getenv(“SFTP_KEY_SECRET”)
logging.debug(f”SFTP host: {sftp_host}, SFTP user: {sftp_user}, SFTP key secret name: {sftp_private_key_secret_name}”)
sftp_private_key = get_secret(sftp_private_key_secret_name)
if sftp_private_key is None:
logging.error(“Failed to retrieve the SFTP private key”)
return jsonify({‘error’: ‘Failed to retrieve the SFTP private key’}), 500
gcs_bucket = os.getenv(“GCS_BUCKET_NAME”)
logging.debug(f”GCS bucket: {gcs_bucket}”)
request_json = request.get_json(silent=True)
request_args = request.args
if request_json and ‘days_threshold’ in request_json:
days_threshold = request_json[‘days_threshold’]
elif request_args and ‘days_threshold’ in request_args:
days_threshold = request_args[‘days_threshold’]
else:
logging.warning(‘Days Parameter Error, Selecting Default (1)’)
days_threshold = 1
days_threshold = int(days_threshold)
logging.debug(f”Processing with days_threshold: {days_threshold}”)
sftp_client, ssh_client = create_sftp_connection(host=sftp_host, username=sftp_user, private_key_pem=sftp_private_key)
if sftp_client is None or ssh_client is None:
logging.error(“Failed to create SFTP connection”)
return jsonify({‘error’: ‘Failed to create SFTP connection’}), 500
new_files_df = identify_new_files(sftp_client, base_directory, days_threshold)
logging.info(f”{len(new_files_df)} new files are ready to download.”)
for filename, filepath in new_files_df[[‘filename’, ‘filepath’]].values:
fetch_file(sftp_client, base_directory + “/” + filepath, ‘/tmp/’ + filename)
transfer_to_gcs(gcs_bucket, ‘/tmp/’ + filename, filename) # Ensure the blob name is appropriate for your use case
sftp_client.close()
ssh_client.close()
return jsonify({‘message’: ‘Process completed successfully’, ‘files’: new_files_df.to_dict(orient=’records’)})
except Exception as e:
logging.error(f”Error in processing request: {e}”)
return jsonify({‘error’: str(e)}), 500
Cloud functions - .env
SFTP_SERVER=your sftp server
SFTP_USER=your sftp user
SFTP_KEY_SECRET=SFTP_KEY
GCS_BUCKET_NAME=your bucket name
BASE_DIRECTORY=/
GOOGLE_CLOUD_PROJECT=your project
Cloud functions - requirements.txt
python-dotenv
paramiko
pandas
google-cloud-storage
google-cloud-secret-manager
flask>=1.0,<3.0