AWS Big Data Blog

Build a Simplified ETL and Live Data Query Solution using Redshift Federated Query

You may have heard the saying that the best ETL is no ETL. Amazon Redshift now makes this possible with Federated Query. In its initial release, this feature lets you query data in Amazon Aurora PostgreSQL or Amazon RDS for PostgreSQL using Amazon Redshift external schemas. Federated Query also exposes the metadata from these source databases through system views and driver APIs, which allows business intelligence tools like Tableau and Amazon Quicksight to connect to Amazon Redshift and query data in PostgreSQL without having to make local copies. This enables a new data warehouse pattern—live data query—in which you can seamlessly retrieve data from PostgreSQL databases, or build data into a late binding view, which combines operational PostgreSQL data, analytical Amazon Redshift local data, and historical Amazon Redshift Spectrum data in an Amazon S3 data lake.

Simplified ETL use case

For this ETL use case, you can simplify the familiar upsert pattern with a federated query. You can bypass the need for incremental extracts in Amazon S3 and the subsequent load via COPY by querying the data in place within its source database. This change can be a single line of code that replaces the COPY command with a query to an external table. See the following code:

BEGIN;
CREATE TEMP TABLE staging (LIKE ods.store_sales);
-- replace the following COPY from S3 
COPY staging FROM 's3://yourETLbucket/daily_store_sales/' 
     IAM_ROLE 'arn:aws:iam::<account_id>:role/<s3_reader_role>' DELIMITER '|' COMPUPDATE OFF;
-- with this federated query to load staging data from PostgreSQL source
INSERT INTO staging SELECT * FROM pg.store_sales p
	WHERE p.last_updated_date > (SELECT MAX(last_updated_date) FROM ods.store_sales)
DELETE FROM ods.store_sales USING staging s WHERE ods.store_sales.id = s.id;
INSERT INTO ods.store_sales SELECT * FROM staging;
DROP TABLE staging;
COMMIT;

In the preceding example, the table pg.store_sales resides in PostgreSQL, and you use a federated query to retrieve fresh data to load into a staging table in Amazon Redshift, keeping the actual delete and insert operations unchanged. This pattern is likely the most common application of federated queries.

Setting up an external schema

The external schema pg in the preceding example was set up as follows:

CREATE EXTERNAL SCHEMA IF NOT EXISTS pg                                                                         
FROM POSTGRES                                                                                                           
DATABASE 'dev' 
SCHEMA 'retail'                                                                                     
URI 'database-1.cluster-ro-samplecluster.us-east-1.rds.amazonaws.com'                                                    
PORT 5432                                                                                                               
IAM_ROLE 'arn:aws:iam::555566667777:role/myFederatedQueryRDS'                                                           
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB'

If you’re familiar with the CREATE EXTERNAL SCHEMA command from using it in Spectrum, note some new parameter options to enable federated queries.

FROM POSTGRES                                                                                                           
DATABASE 'dev' 
SCHEMA 'retail'

Whereas Amazon Redshift Spectrum references an external data catalog that resides within AWS Glue, Amazon Athena, or Hive, this code points to a Postgres catalog. Also, expect more keywords used with FROM, as Amazon Redshift supports more source databases for federated querying. By default, if you do not specify SCHEMA, it defaults to public.

Within the target database, you identify DATABASE ‘dev’ and SCHEMA ‘retail’, so any queries to the Amazon Redshift table pg.<some_table> get issued to PostgreSQL as a request for retail.<some_table> in the dev database. For Amazon Redshift, query predicates are pushed down and run entirely in PostgreSQL, which reduces the result set returned to Amazon Redshift for subsequent operations. Going further, the query planner derives cardinality estimates for external tables to optimize joins between Amazon Redshift and PostgreSQL. From the preceding example:

URI 'database-1.cluster-ro-samplecluster.us-east-1.rds.amazonaws.com'                                                    
PORT 5432

The URI and PORT parameters that reference both the PostgreSQL endpoint and port are self-explanatory, but there are a few things to consider in your configuration:

  • Use a read replica endpoint in Aurora or Amazon RDS for PostgreSQL to reduce load on the primary instance.
  • Set up your Amazon RDS for PostgreSQL instance, Aurora serverless or provisioned instances, and Amazon Redshift clusters to use the same VPC and subnet groups. That way, you can add the security group for the cluster to the inbound rules of the security group for the Aurora or Amazon RDS for PostgreSQL instance.
  • If both Amazon Redshift and Aurora or Amazon RDS for PostgreSQL are on different VPCs, set up VPC peering. For more information, see What is VPC Peering?

Configuring AWS Secrets Manager for remote database credentials

To retrieve AWS Secrets Manager remote database credentials, our example uses the following code:

IAM_ROLE 'arn:aws:iam::555566667777:role/myFederatedQueryRDS'                                                           
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB'

These two parameters are interrelated because the SECRET_ARN is also embedded in the IAM policy for the role.

If a service like Secrets Manager didn’t exist and you wanted to issue a federated query from Amazon Redshift to PostgreSQL, you would need to supply the database credentials to the CREATE EXTERNAL SCHEMA command via a parameter like CREDENTIALS, which you also use with the COPY command. However, this hardcoded approach doesn’t take into account that the PostgreSQL credentials could expire.

You avoid this problem by keeping PostgreSQL database credentials within Secrets Manager, which provides a centralized service to manage secrets. Because Amazon Redshift retrieves and uses these credentials, they are transient and not stored in any generated code and are discarded after query execution.

Storing credentials in Secrets Manager takes up to a few minutes. To store a new secret, complete the following steps:

 

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. In the Store a new secret section, complete the following:

 

  • Supply your PostgreSQL database credentials
  • Name the secret; for example, MyRDSCredentials
  • Configure rotation (you can enable this at a later time)
  • Optionally, copy programmatic code for accessing your secret using your preferred programming languages (which is not needed for this post)
  1. Choose Next.

You can also retrieve the credentials easily.

  1. On the Secrets Manager console, choose your secret.
  2. Choose Retrieve secret value.

The following screenshot shows you the secret value details.

This secret is now an AWS resource referenced via a secret ARN. See the following screenshot.

Setting up an IAM role

You can now pull everything together by embedding the secret ARN into an IAM policy, naming the policy, and attaching it to an IAM role. See the following code:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": "*"
        }
    ]
}

The following screenshot shows the details of the IAM role called myFederatedQueryRDS, which contains the MyRDSSecretPolicy policy. It’s the same role that’s supplied in the IAM_ROLE parameter of the CREATE EXTERNAL SCHEMA DDL.

Finally, attach the same IAM role to your Amazon Redshift cluster.

  1. On the Amazon Redshift console, choose your cluster.
  2. From the Actions drop-down menu, choose Manage IAM roles.
  3. Choose and add the IAM role you just created.

You have now completed the following steps:

  1. Create an IAM policy and role
  2. Store your PostgreSQL database credentials in Secrets Manager
  3. Create an Amazon Redshift external schema definition that uses the secret and IAM role to authenticate with a PostgreSQL endpoint
  4. Apply a mapping between an Amazon Redshift database and schema to a PostgreSQL database and schema so Amazon Redshift may issue queries to PostgreSQL tables.

You only need to complete this configuration one time.

Querying live operational data

This section explores another use case: querying operational data across multiple source databases. In this use case, a global online retailer has databases deployed by different teams across distinct geographies:

  • Region us-east-1 runs serverless Aurora PostgreSQL.
  • Region us-west-1 runs provisioned Aurora PostgreSQL, which is also configured as a global database with a read replica in us-east-1.
  • Region eu-west-1 runs an Amazon RDS for PostgreSQL instance with a read replica in us-east-1.

Serverless and provisioned Aurora PostgreSQL and Amazon RDS for PostgreSQL are visible in the Amazon RDS console in Region us-east-1. See the following screenshot:

For this use case, assume that you configured the read replicas for Aurora and Amazon RDS to share the same VPC and subnets in us-east-1 with the local serverless Aurora PostgreSQL. Furthermore, you have already created secrets for each of these instances’ credentials, and also an IAM role MyCombinedRDSSecretPolicy, which is more permissive and allows Amazon Redshift to retrieve the value of any Amazon RDS secret within any Region. Be mindful of security in actual production use, however, and explicitly specify the resource ARNs for each secret in separate statements in your IAM policy. See the following code:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:*:555566667777:secret:*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": "*"
        }
    ]
}

External schema DDLs in Amazon Redshift can then reference the combined IAM role and individual secret ARNs. See the following code:

CREATE EXTERNAL SCHEMA IF NOT EXISTS useast
FROM POSTGRES
DATABASE 'dev'
URI 'us-east-1-aurora-pg-serverless.cluster-samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyEastUSAuroraServerlessCredentials-dXOlEq'
;

CREATE EXTERNAL SCHEMA IF NOT EXISTS uswest
FROM POSTGRES
DATABASE 'dev'
URI 'global-aurora-pg-west-coast-stores-instance-1.samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:us-west-1:555566667777:secret:MyWestUSAuroraGlobalDBCredentials-p3sV9m'
;

CREATE EXTERNAL SCHEMA IF NOT EXISTS europe
FROM POSTGRES
DATABASE 'dev'
URI 'eu-west-1-postgres-read-replica.samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:eu-west-1:555566667777:secret:MyEuropeRDSPostgresCredentials-mz2u9L'
;

This late binding view abstracts the underlying queries to TPC-H lineitem test data within all PostgreSQL instances. See the following code:

CREATE VIEW global_lineitem AS
SELECT 'useast' AS region, * from useast.lineitem
UNION ALL
SELECT 'uswest', * from uswest.lineitem
UNION ALL
SELECT 'europe', * from europe.lineitem
WITH NO SCHEMA BINDING
;

Amazon Redshift can query live operational data across multiple distributed databases and aggregate results into a unified view with this feature. See the following code:

dev=# SELECT region, extract(month from l_shipdate) as month,
      sum(l_extendedprice * l_quantity) - sum(l_discount) as sales
      FROM global_lineitem
      WHERE l_shipdate >= '1997-01-01'
      AND l_shipdate < '1998-01-01'
      AND month < 4
      GROUP BY 1, 2
      ORDER BY 1, 2
;
 region | month |      sales
--------+-------+------------------
 europe |     1 | 16036160823.3700
 europe |     2 | 15089300790.7200
 europe |     3 | 16579123912.6700
 useast |     1 | 16176034865.7100
 useast |     2 | 14624520114.6700
 useast |     3 | 16645469098.8600
 uswest |     1 | 16800599170.4600
 uswest |     2 | 14547930407.7000
 uswest |     3 | 16595334825.9200
(9 rows)

If you examine Remote PG Seq Scan in the following query plan, you see that predicates are pushed down for execution in all three PostgreSQL databases. Unlike your initial simplified ETL use case, no ETL is performed because data is queried and filtered in place. See the following code:

dev=# EXPLAIN SELECT region, extract(month from l_shipdate) as month,
      sum(l_extendedprice * l_quantity) - sum(l_discount) as sales
FROM global_lineitem
WHERE l_shipdate >= '1997-01-01'
AND l_shipdate < '1998-01-01'
AND month < 4
GROUP BY 1, 2
ORDER BY 1, 2
;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
   Merge Key: derived_col1, derived_col2
   ->  XN Network  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
         Send to leader
         ->  XN Sort  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
               Sort Key: derived_col1, derived_col2
               ->  XN HashAggregate  (cost=60136.52..60138.02 rows=200 width=100)
                     ->  XN Subquery Scan global_lineitem  (cost=20037.51..60130.52 rows=600 width=100)
                           ->  XN Append  (cost=20037.51..60124.52 rows=600 width=52)
                                 ->  XN Subquery Scan "*SELECT* 1"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan useast.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
                                 ->  XN Subquery Scan "*SELECT* 2"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan uswest.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
                                 ->  XN Subquery Scan "*SELECT* 3"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan europe.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
(24 rows)

Combining the data lake, data warehouse, and live operational data

In this next use case, you join Amazon Redshift Spectrum historical data with current data in Amazon Redshift and live data in PostgreSQL. You use a 3TB TPC-DS dataset and unload data from 1998 through 2001 from the store_sales table in Amazon Redshift to Amazon S3. The unloaded files are stored in Parquet format with ss_sold_date_sk as partitioning key.

To access this historical data via Amazon Redshift Spectrum, create an external table. See the following code:

CREATE EXTERNAL TABLE spectrum.store_sales_historical
(
  ss_sold_time_sk int ,
  ss_item_sk int ,
  ss_customer_sk int ,
  ss_cdemo_sk int ,
  ss_hdemo_sk int ,
  ss_addr_sk int ,
  ss_store_sk int ,
  ss_promo_sk int ,
  ss_ticket_number bigint,
  ss_quantity int ,
  ss_wholesale_cost numeric(7,2) ,
  ss_list_price numeric(7,2) ,
  ss_sales_price numeric(7,2) ,
  ss_ext_discount_amt numeric(7,2) ,
  ss_ext_sales_price numeric(7,2) ,
  ss_ext_wholesale_cost numeric(7,2) ,
  ss_ext_list_price numeric(7,2) ,
  ss_ext_tax numeric(7,2) ,
  ss_coupon_amt numeric(7,2) ,
  ss_net_paid numeric(7,2) ,
  ss_net_paid_inc_tax numeric(7,2) ,
  ss_net_profit numeric(7,2)
)
PARTITIONED BY (ss_sold_date_sk int)
STORED AS PARQUET
LOCATION 's3://mysamplebucket/historical_store_sales/';   

The external spectrum schema is defined as the following:

CREATE EXTERNAL SCHEMA spectrum
FROM data catalog DATABASE 'spectrumdb'
IAM_ROLE 'arn:aws:iam::555566667777:role/mySpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

Instead of an Amazon S3 read-only policy, the IAM role mySpectrumRole contains both AmazonS3FullAccess and AWSGlueConsoleFullAccess policies, in which the former allows Amazon Redshift writes to Amazon S3. See the following code:

UNLOAD ('SELECT * FROM tpcds.store_sales WHERE ss_sold_date_sk < 2452276')
TO 's3://mysamplebucket/historical_store_sales/'
IAM_ROLE 'arn:aws:iam::555566667777:role/mySpectrumRole'
FORMAT AS PARQUET
PARTITION BY (ss_sold_date_sk) ALLOWOVERWRITE;

To make partitioned data visible, the ALTER TABLE ... ADD PARTITION command needs to specify all partition values. For this use case, 2450816 through 2452275 correspond to dates 1998-01-02 through 2001-12-31, respectively. To generate these DDLs quickly, use the following code:

WITH partitions AS (SELECT * FROM generate_series(2450816, 2452275))
SELECT 'ALTER TABLE spectrum.store_sales_historical ADD PARTITION (ss_sold_date_sk='|| generate_series || ') '
    || 'LOCATION \'s3://mysamplebucket/historical_store_sales/ss_sold_date_sk=' || generate_series || '/\';'
FROM partitions;

You can run the generated ALTER TABLE statements individually or as a batch to make partition data visible. See the following code:

ALTER TABLE spectrum.store_sales_historical 
ADD PARTITION (ss_sold_date_sk=2450816)
LOCATION 's3://mysamplebucket/historical_store_sales/ss_sold_date_sk=2450816/';
-- repeated for all partition values

The three combined sources in the following view consist of historical data in Amazon S3 for 1998 through 2001, current data local to Amazon Redshift for 2002, and live data for two months of 2003 in PostgreSQL. When you create this late binding view, you have to re-order Amazon Redshift Spectrum external table columns because the previous UNLOAD operation specifying ss_sold_date_sk as partition key shifted that column’s order to last. See the following code:

CREATE VIEW store_sales_integrated AS
SELECT * FROM uswest.store_sales_live
UNION ALL
SELECT * FROM tpcds.store_sales_current
UNION ALL
SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
       ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, 
       ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, 
       ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
       ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, 
       ss_net_paid_inc_tax, ss_net_profit
FROM spectrum.store_sales_historical
WITH NO SCHEMA BINDING;

You can now run a query on the view to aggregate date and join tables across the three sources. See the following code:

dev=# SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
GROUP BY 1
ORDER BY 1
;
 date_part |   count
-----------+------------
      1998 | 1632403114
      1999 | 1650163390
      2000 | 1659168880
      2001 | 1641184375
      2002 | 1650209644
      2003 |   17994540
(6 rows)

Time: 77624.926 ms (01:17.625)

This following federated query ran on a two-node DC2.8XL cluster and took 1 minute and 17 seconds to join store sales in Amazon S3, PostgreSQL, and Amazon Redshift, with the date dimension table in Amazon Redshift, aggregating and sorting row counts by year:

dev=# EXPLAIN SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
GROUP BY 1
ORDER BY 1;

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
   Merge Key: "date_part"('year'::text, b.d_date)
   ->  XN Network  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
         Send to leader
         ->  XN Sort  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
               Sort Key: "date_part"('year'::text, b.d_date)
               ->  XN HashAggregate  (cost=461036314645.93..461036315011.18 rows=73049 width=8)
                     ->  XN Hash Join DS_DIST_ALL_NONE  (cost=913.11..428113374829.91 rows=6584587963204 width=8)
                           Hash Cond: ("outer".ss_sold_date_sk = "inner".d_date_sk)
                           ->  XN Subquery Scan a  (cost=0.00..263498674836.70 rows=6584587963204 width=4)
                                 ->  XN Append  (cost=0.00..197652795204.66 rows=6584587963204 width=4)
                                       ->  XN Subquery Scan "*SELECT* 1"  (cost=0.00..539836.20 rows=17994540 width=4)
                                             ->  XN PG Query Scan store_sales_live  (cost=0.00..359890.80 rows=17994540 width=4)
                                                   ->  Remote PG Seq Scan uswest.store_sales_live  (cost=0.00..179945.40 rows=17994540 width=4)
                                       ->  XN Subquery Scan "*SELECT* 2"  (cost=0.00..33004193.28 rows=1650209664 width=4)
                                             ->  XN Seq Scan on store_sales_current  (cost=0.00..16502096.64 rows=1650209664 width=4)
                                       ->  XN Subquery Scan "*SELECT* 3"  (cost=0.00..197619251175.18 rows=6582919759000 width=4)
                                             ->  XN Partition Loop  (cost=0.00..131790053585.18 rows=6582919759000 width=4)
                                                   ->  XN Seq Scan PartitionInfo of spectrum.store_sales_historical  (cost=0.00..10.00 rows=1000 width=4)
                                                   ->  XN S3 Query Scan store_sales_historical  (cost=0.00..131658395.18 rows=6582919759 width=0)
                                                         ->  S3 Seq Scan spectrum.store_sales_historical location:"s3://mysamplebucket/historical_store_sales" format:PARQUET (cost=0.00..65829197.59 rows=6582919759 width=0)
                           ->  XN Hash  (cost=730.49..730.49 rows=73049 width=8)
                                 ->  XN Seq Scan on date_dim b  (cost=0.00..730.49 rows=73049 width=8)
(23 rows)

The query plan shows these are full sequential scans running on the three source tables with the number of returned rows highlighted, totaling 8.2 billion. Because Amazon Redshift Spectrum does not generate statistics for external tables, you manually set the numRows property to the row count for historical data in Amazon S3. See the following code:

ALTER TABLE spectrum.store_sales_historical SET TABLE PROPERTIES ('numRows' = '6582919759');

You can join with another dimension table local to Amazon Redshift, this time the 30 million row customer table, and filter by column c_birth_country. See the following code:

dev=# SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
JOIN tpcds.customer c on (a.ss_customer_sk = c.c_customer_sk)
AND c.c_birth_country = 'UNITED STATES'
GROUP BY 1
ORDER BY 1
;
 date_part |  count
-----------+---------
      1998 | 7299277
      1999 | 7392156
      2000 | 7416905
      2001 | 7347920
      2002 | 7390590
      2003 |   81627
(6 rows)

Time: 77878.586 ms (01:17.879)

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
   Merge Key: "date_part"('year'::text, b.d_date)
   ->  XN Network  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
         Send to leader
         ->  XN Sort  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
               Sort Key: "date_part"('year'::text, b.d_date)
               ->  XN HashAggregate  (cost=363288854947.85..363288855313.09 rows=73049 width=8)
                     ->  XN Hash Join DS_DIST_ALL_NONE  (cost=376252.50..363139873158.03 rows=29796357965 width=8)
                           Hash Cond: ("outer".ss_sold_date_sk = "inner".d_date_sk)
                           ->  XN Hash Join DS_BCAST_INNER  (cost=375339.39..362394963295.79 rows=29796357965 width=4)
                                 Hash Cond: ("outer".ss_customer_sk = "inner".c_customer_sk)
                                 ->  XN Subquery Scan a  (cost=0.00..263498674836.70 rows=6584587963204 width=8)
                                       ->  XN Append  (cost=0.00..197652795204.66 rows=6584587963204 width=8)
                                             ->  XN Subquery Scan "*SELECT* 1"  (cost=0.00..539836.20 rows=17994540 width=8)
                                                   ->  XN PG Query Scan store_sales_live  (cost=0.00..359890.80 rows=17994540 width=8)
                                                         ->  Remote PG Seq Scan uswest.store_sales_live  (cost=0.00..179945.40 rows=17994540 width=8)
                                             ->  XN Subquery Scan "*SELECT* 2"  (cost=0.00..33004193.28 rows=1650209664 width=8)
                                                   ->  XN Seq Scan on store_sales_current  (cost=0.00..16502096.64 rows=1650209664 width=8)
                                             ->  XN Subquery Scan "*SELECT* 3"  (cost=0.00..197619251175.18 rows=6582919759000 width=8)
                                                   ->  XN Partition Loop  (cost=0.00..131790053585.18 rows=6582919759000 width=8)
                                                         ->  XN Seq Scan PartitionInfo of spectrum.store_sales_historical  (cost=0.00..10.00 rows=1000 width=4)
                                                         ->  XN S3 Query Scan store_sales_historical  (cost=0.00..131658395.18 rows=6582919759 width=4)
                                                               ->  S3 Seq Scan spectrum.store_sales_historical location:"s3://mysamplebucket/historical_store_sales" format:PARQUET (cost=0.00..65829197.59 rows=6582919759 width=4)
                                 ->  XN Hash  (cost=375000.00..375000.00 rows=135755 width=4)
                                       ->  XN Seq Scan on customer c  (cost=0.00..375000.00 rows=135755 width=4)
                                             Filter: ((c_birth_country)::text = 'UNITED STATES'::text)
                           ->  XN Hash  (cost=730.49..730.49 rows=73049 width=8)
                                 ->  XN Seq Scan on date_dim b  (cost=0.00..730.49 rows=73049 width=8)
(28 rows)

Query performance hardly changed from the previous query. Because the query only scanned one column (ss_sold_date_sk), it benefits from Parquet’s columnar structure for the historical data subquery. To put it another way, if the historical data is stored as CSV, all the data is scanned, which degrades performance significantly.

Additionally, the TPC-DS model does not store date values in the store_sales fact table. Instead, a foreign key references the date_dim table. If you plan on implementing something similar but frequently filter by a date column, consider adding that column into the fact table and have it as a sort key, and also adding a partitioning column in Amazon Redshift Spectrum. That way, Amazon Redshift can more efficiently skip blocks for local data and prune partitions for Amazon S3 data, in the latter, and also push filtering criteria down to Amazon Redshift Spectrum.

Conclusion

Applications of live data integration in real-world scenarios include data discovery, data preparation for machine learning, operational analytics, IoT telemetry analytics, fraud detection, and compliance and security audits. Whereas Amazon Redshift Spectrum extends the reach of Amazon Redshift into the AWS data lake, Federated Query extends its reach into operational databases and beyond.

For more information about data type differences between these databases, see Data Type Differences Between Amazon Redshift and Supported RDS PostgreSQL or Aurora PostgreSQL Databases. For more information about accessing federated data with Amazon Redshift, see Limitations and Considerations When Accessing Federated Data with Amazon Redshift, and to learn more about how the feature works watch this demo.

 


About the Authors

Tito Mijares is a Data Warehouse Specialist Solutions Architect at AWS. He helps AWS customers adopt and optimize their use of Amazon Redshift. Outside of AWS, he enjoys playing jazz guitar and working on audio recording and playback projects.

 

 

 

Entong Shen is a Senior Software Development Engineer for Amazon Redshift. He has been working on MPP databases for over 8 years and has focused on query optimization, statistics and SQL language features such as stored procedures and federated query. In his spare time, he enjoys listening to music of all genres and working in his succulent garden.

 

 

 

Niranjan Kamat is a software engineer on the Amazon Redshift query processing team. His focus of PhD research was in interactive querying over large databases. In Redshift, he has worked in different query processing areas such as query optimization, analyze command and statistics, and federated querying. In his spare time, he enjoys playing with his three year old daughter, practicing table tennis (was ranked in top 10 in Ohio, USATT rating 2143), and chess.

 

 

Sriram Krishnamurthy is a Software Development Manager for Amazon Redshift Query Processing team. He is passionate about Databases and has been working on Semi Structured Data Processing and SQL Compilation & Execution for over 15 years. In his free time, you can find him on the tennis court, often with his two young daughters in tow.