Postgres to Snowflake — Migrate Real-time and Historical data
Cloud data warehouse like Snowflake is designed for scalable business intelligence and analytics. There are various reasons to move from a traditional data warehouse to a cloud-based data warehouse, such as to handle big data and improve speed and performance. However, Our major reason for this migration was to accommodate end users. We want our users to run multiple queries against terabytes of data simultaneously and receive results in seconds.
In Postgres hosted on AWS RDS, the data retrieval time was 13 minutes 23 seconds for 800k records. On the other hand, I ran the same query on Snowflake and got the result in just 9 seconds.
One of our challenges while migration was to keep the existing infrastructure running on Postgres as is and replicate the changes happening on Postgres DB to Snowflake within seconds. I tried multiple solutions using the Kinesis data stream and Firehose and wrote the transformations using Lambda functions. The process was quite complex, and the latency was high due to the multiple processes involved. As our ETL was already running on Postgres, I just had to replicate the historical and incremental data from Postgres to Snowflake. So, I used the following tech stack to implement the solution.
Tools & Technologies used
- RDS Postgres
- Snowflake
- AWS DMS
- AWS S3
- PgAdmin
Please note that the versions of the above tech stack play a vital role in the migration. I faced multiple issues due to the version mismatch and specific support of the particular version of the AWS service.
While configuring DMS, please use one engine version lower than the latest version, i.e. 3.4.6. I will remind you at every step regarding which versions and options to choose.
Why I choose AWS DMS?
AWS DMS is an AWS cloud service created to migrate data in a variety of ways: to the AWS cloud, from on-premises or cloud hosted data stores. AWS DMS can migrate all kinds of data ranging from relational databases, data warehouses, NoSQL databases, and other types of data stores. It is quite versatile and can handle one-time data migration or perform continual data replication with ongoing changes, syncing the source and target.
One of my major reason is the ease of scalability in future and the cost of migration compared with other awesome services like Fivetran and Matillion.
Please follow the below steps and choose minimal extra options if you are implementing the pipeline for the first time to make the pipeline run. Once your POC pipeline is ready, you can easily configure the existing pipeline as per your need.
First, we will replicate the incremental and historical data from Postgres to AWS S3 and then we will create a pipeline to migrate the data from AWS S3 to Snowflake.
Steps to replicate data from Postgres to AWS S3
Step 1 —
First, login into your AWS account,
Open RDS → Parameter Groups → Create Parameter Group.
- Select DB family — Postgres along with its correct version.
- In type, select DB Cluster Parameter Group.
- Provide the Group Name as per your naming standards and Create.
- In the new Parameter Group, set the
rds.logical_replication
parameter in your DB CLUSTER parameter group to 1. - The
wal_sender_timeout
parameter ends replication connections that are inactive longer than the specified number of milliseconds. The default is 60000 milliseconds (60 seconds). Setting the value to 0 (zero) disables the timeout mechanism and is a valid setting for DMS. - set the parameter
shared_preload_libraries
topglogical
in the same RDS parameter group.
Step 2 —
- Apply Parameter Group to your DB Cluster under the Additional Configuration.
- Select almost instantaneous as an option and REBOOT the database.
Step 3—
Go to PgAdmin (I was using PgAdmin4) and run the below queries. Please note that these queries should be executed as a superuser/admin.
- create extension pglogical;
- SELECT * FROM pg_catalog.pg_extension
- SELECT * FROM pg_create_logical_replication_slot(‘replication_slot_name’, ‘pglogical’);
Step 4 —
Create an S3 bucket if you have not created one. While creating an S3 bucket, I recommend ACLs be disabled, blocking all public access and enabling server-side encryption(SSE-S3).
Go to IAM → Policy → Create Policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:DeleteObject",
"s3:PutObjectTagging"
],
"Resource": [
"arn:aws:s3:::buckettest2/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::buckettest2"
]
}
]
}
Step 5—
Go to IAM → Role → Create Role
- The IAM role has DMS (dms.amazonaws.com) added as a trusted entity. For the same, select AWS Service, select DMS from the other AWS services.
- Attach the policy created above to your new role.
- After providing the role name, Create the role.
Step 6 —
Go to DMS → Create a replication instance → select a lower engine version.
- Select the VPC and Add subnets, if any.
- Please make sure that if there is any Security group for RDS, select the specific port for inbound and Outbound open for all as required for DMS to export files into S3.
- If you choose Multi-AZ production, do not choose AZ settings.
- Create the instance.
Step 7 —
Go to Endpoints → Create an endpoint.
- Select Source endpoint → select RDS DB instance.
- In the Endpoint configuration, either provide access to Postgres manually by providing a username and password or select AWS Secret Manager if you have the setup ready.
- KMS key as default.
- In Test endpoint connection, select VPC → Select replication instance created above → test the connection → Create the endpoint.
Similarly, Create another endpoint.
- Select Target endpoint → Target engine as AWS S3
- Provide the ARN of the role created above.
- Provide bucket name and bucket folder.
- In Test endpoint connection, select VPC → Select replication instance created above → test the connection → Create the endpoint.
Step 8 —
Go to Migration task → Create Migration Task
- Choose Replication Instance — the instance that we created above.
- Select Source and Target endpoint — Source as Postgres and Target as AWS S3
- Select Migration type — There will be three options: “Migrate existing data”, “ Migrate existing data and replicate ongoing changes”, and “replicate data changes only”. Select the option based on your requirement.
- In Task settings, select “Do nothing” in the Target table preparation mode and keep other options to default settings. You can enable cloudwatch logs if you face any errors or you want to track the transaction logs.
- In Table mappings, click on Add new selection rule. If you want to include or exclude a specific schema or table, you can replace % with the schema name & table name. Select the action based on your requirement.
- There are multiple transformation options available in DMS tasks. If you want to transform the output tables, such as adding prefixes or a column, then you can select the transformation action and its value.
- I would recommend selecting “Enable premigration assessment run”. This assessment is helpful to test the connections and other correlations. You just need to provide the S3 bucket and AWS role we created above. Once you create the task, you can see the assessment report in your S3 bucket, or you can also check it in DMS tasks.
- Create the Task. Once the premigration assessment report is successful, start the task.
For each source table that contains records, AWS DMS creates a folder under the specified target folder (if the source table is not empty). AWS DMS writes all full load and CDC files to the specified Amazon S3 bucket. There will be two types of files in the folder —
- Historical files — These files will have the prefix “LOAD”.
- Incremental files — These files will have the name as the current timestamp.
AWS DMS names files created during a full load using an incremental hexadecimal counter — for example, LOAD00001.csv, LOAD00002…, LOAD00009, LOAD0000A, and so on for .csv files. AWS DMS names CDC files using timestamps, for example, 20141029–1134010000.csv.
When you use AWS DMS to replicate data changes using a CDC task, the first column of the .csv or .parquet output file indicates how the row data was changed as shown for the following .csv file.
I,101,Smith,Bob,4-Jun-19,Vancouver
U,101,Smith,Bob,8-Oct-21,London
U,101,Smith,Bob,13-Mar-22,Texas
D,101,Smith,Bob,13-Mar-22,Toronto
Here, I — Insert, U — Update, D — Delete
Now, let’s create the pipeline from AWS S3 to Snowflake using Snowpipe, Tasks and Streams. Snowflake documentation is easy to understand and helpful in creating the connections between AWS S3 and Snowflake
Steps to replicate data from AWS S3 to Snowflake
Before starting with the steps to migrate data, create the tables in Snowflake as per our data files in AWS S3.
If you are migrating historical data, create the table in Snowflake without any constraints except Not Null. Please take care of date and timestamp data types as most of the other data types are the same in Postgres and Snowflake.
If you are migrating incremental/CDC data, create the tables with an extra column that DMS generates to identify if it is an Insert, Update or Delete. For example: if your table has four columns — id, name, email, phone, then add column at the start(say status). Now, the columns of your table would be — status, id, name, email, phone. I recommend to add the postfix to the table name, such as <table>_stg.
If you want to create the final table with all the updated records, create another table without the status column. This will be your updated table with all the latest transaction changes applied. For the same, we will use tasks and streams. Now, follow the below steps to migrate the data from AWS S3 to Snowflake.
Step 1 — Configuring access to AWS S3.
- Log into the AWS Management Console.
- From the home dashboard, choose Identity & Access Management (IAM)
3. Choose Account settings from the left-hand navigation pane.
4. Expand the Security Token Service Regions list, find the AWS region corresponding to the region where your account is located, and choose Activate if the status is Inactive.
5. Choose Policies from the left-hand navigation pane.
6. Click Create Policy and then Click the JSON tab.
7. Add a policy document that will allow Snowflake to access the S3 bucket and folder.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<bucket>",
"Condition": {
"StringLike": {
"s3:prefix": [
"<prefix>/*"
]
}
}
}
]
}
8. Click Review policy.
9. Enter the policy name (e.g. snowflake_access
) and an optional description. Click Create policy.
Step 2 — Create the IAM Role in AWS
- Go to AWS → IAM → Roles → Create Role.
- Select AWS account as the trusted entity type.
3. Later, you will modify the trusted relationship and grant access to Snowflake.
4. Select the Require external ID option. Enter a dummy ID such as 0000
. Later, you will modify the trusted relationship and specify the external ID for your Snowflake stage. An external ID is required to grant access to your AWS resources (i.e. S3) to a third party (i.e. Snowflake).
5. Next, Locate the policy you created above and select that policy.
6. Enter a name and description for the role, and click the Create role button.
You have now created an IAM policy for a bucket, created an IAM role and attached the policy to the role. Record the Role ARN value located on the role summary page. In the next step, you will create a Snowflake integration that references this role.
Step 3 — Create a Cloud Storage Integration in Snowflake
Create a storage integration using the CREATE STORAGE INTEGRATION command. A storage integration is a Snowflake object that stores a generated identity and access management (IAM) user for your S3 cloud storage, along with an optional set of allowed or blocked storage locations (i.e. buckets).
CREATE STORAGE INTEGRATION s3_int
type = external_stage
storage_provider = 'S3'
enabled = true
storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole'
storage_allowed_locations = ('s3://mybucket1/mypath1/', 's3://mybucket2/')
storage_blocked_locations = ('s3://mybucket1/mypath3/', 's3://mybucket2/mypath4/');
Please note that the S3 bucket path in the Snowflake role created above should include the storage_allowed_locations in the storage integration. I recommend providing access to the complete bucket in the IAM policy if you are using multiple locations in AWS S3.
Step 4 — Retrieve the AWS IAM User for your Snowflake Account
- Execute the DESCRIBE INTEGRATION command to retrieve the ARN for the AWS IAM user that was created automatically for your Snowflake account:
DESC INTEGRATION s3_int;
2. Copy the values of STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID
Step 5 — Grant the IAM User Permissions to Access Bucket Objects
- Go to AWS → IAM → Roles → Select the snowflake role.
- Click on the Trust relationships tab and click the Edit trust relationship button.
- Modify the policy document with the DESC STORAGE INTEGRATION output values you copied from the last step.
- Policy document for IAM role
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<snowflake_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<snowflake_external_id>"
}
}
}
]
}
Where:
snowflake_user_arn
is the STORAGE_AWS_IAM_USER_ARN value you recorded.
snowflake_external_id
is the STORAGE_AWS_EXTERNAL_ID value you recorded.
5. Click the Update Trust Policy button. The changes are saved.
Step 6 — Setting up SQS notifications
As the data files are loaded in an S3 stage, an S3 event notification informs Snowpipe via an SQS queue that files are ready to load. Snowpipe copies the files into a queue. A Snowflake-provided virtual warehouse loads data from the queued files into the target table based on parameters defined in the specified pipe.
- Create an external Stage
use schema snowpipe_db.public;
create stage mystage
URL = 's3://mybucket/load/files'
storage_integration = my_storage_int;
2. Create a Pipe with Auto-Ingest Enabled
Here, the important point to note is that you need to use the stg table with an additional status column.
create pipe snowpipe_db.public.mypipe auto_ingest=true as
copy into snowpipe_db.public.mytable_stg
from @snowpipe_db.public.mystage
file_format = (type = 'JSON');
The AUTO_INGEST=true
the parameter specifies to read event notifications sent from an S3 bucket to an SQS queue when new data is ready to load.
3. Execute the SHOW PIPES command:
show pipes;
Note the ARN of the SQS queue for the stage in the notification_channel
column. Copy the ARN to a convenient location like a notepad.
4. Log into the Amazon S3 console. In the Buckets list, choose the name of the bucket that you want to enable events for.
5. Choose Properties. Navigate to the Event Notifications section and choose to Create event notification.
5. Configure an event notification for your S3 bucket using the instructions provided in the Amazon S3 documentation. Complete the fields as follows:
- Name: Name of the event notification (e.g.
Auto-ingest Snowflake
). - Events: Select the ObjectCreate (All) option.
- Send to: Select SQS Queue from the dropdown list.
- SQS: Select Add SQS queue ARN from the dropdown list.
- SQS queue ARN: Paste the SQS queue name from the SHOW PIPES output.
Snowpipe with auto-ingest is now configured!
Now, if you want to create an updated table with all the latest records, please follow the below steps.
Step 1 — Create a Stream on the STG tables.
CREATE OR REPLACE STREAM table_stg_stream ON TABLE public.table_stg;SHOW STREAMS;
To check the stream functionality we can insert data into the staging table and query the stream to check if the data insertion is captured by the stream.
INSERT OVERWRITE INTO table_stg
VALUES ('I', 1, 'Tom','tom.harry@john.com', 04129);SELECT * FROM table_stg_stream;
Snowflake streams are quite vast and could be used for various purposes, such as transaction logs, data merge, ETL pipeline and many more. In this blog, I am covering the basic streams as the data in the staging table will be loaded through AWS DMS.
Step 2— Create a task to merge the stream with the final table.
The task is created to merge the changes captured by the stream with the final_table. I have scheduled the task to 1 minute, i.e. task will run every minute as per the condition provided. One minute is the minimum time and we cannot reduce the time to less than a minute.
CREATE OR REPLACE TASK all_data_changes
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('table_stg_stream')
AS
MERGE INTO public.final_table -- Target table to merge changes from a source table
USING ( SELECT *
FROM table_stg_stream
) S
ON F.user_id=S.user_id
when matched -- DELETE condition
and s.status = 'D'
then delete
when matched -- UPDATE condition
and s.status= 'U'
then update
set f.username = s.username,
f.email= s.email,
f.phone=s.phone
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(user_id, username, email, phone)
values
(s.user_id, s.username, s.email, s.)
ALTER TASK all_data_changes RESUME;
SHOW TASKS;
After creating the task, It is important to run the ALTER TASK statement to start the execution of the task.
Now, you are ready to test the complete end-to-end pipeline by updating the record in Postgres and querying it in Snowflake.
Before you leave:
If you liked this article, don’t forget to give me a few claps, follow me and thus receive all updates about new publications.
Hope this helps. Let me know if you have more questions.
Email — imharmanbhatia@gmail.com
Please feel free to reach out via email, LinkedIn or Twitter.