image_pdfimage_print

This article originally appeared on Medium.com. It has been republished with permission from the author.

One of the biggest challenges in making use of data is avoiding the complexity of making N copies of a dataset to be used with N different analytics systems. Dataset sprawl and application silos are quicksand for productivity. And the backups of your important tables are sitting idle so perhaps you can do more? This post describes how PrestoSql can help you leverage your backup dataset to enable dev/test and analytics too.

Prestosql is a distributed query engine capable of bringing SQL analytics to a wide variety of data stores, including an S3 object store like Pure FlashBlade. Running Presto on Kubernetes with FlashBlade and Pure’s CSI provisioner provide a simple, flexible, scalable, and enterprise-ready data infrastructure.

But, PrestoSql can also be used as part of a data protection strategy to take advantage of your existing backup data copy for dev/test workloads and analytics. In this blog I will configure the Starburst operator for Presto with FlashBlade S3 and demonstrate three example usages for rapid restoration: standard disaster recovery, dev/test copies of tables, and machine learning.

PrestoSql and Pure are uniquely suited for these rapid restore use cases because they integrate easily with multiple data stores: databases, data warehouses, and data lakes. As a result, you can avoid excess data copies and leave data in the best format or system. PrestoSql queries data where it lives and Pure provides the flexible and performant storage regardless of where the data lives.

There is no single solution for data protection. Native tools (example for Postgresql) and third-party backup software (example: Kasten K10) are an essential part of a protection strategy. Backing up tables with PrestoSQL uniquely also makes the data copy available for other uses, such as dev/test tables or analytics and machine learning workloads. Other tools can also be used to achieve similar flows, though with less flexibility than PrestoSql, for example with Sqoop and FlashBlade.

Rapid Restore Scenarios

Presto and FlashBlade invert standard backup workflows by focusing instead on rapid restore opportunities afforded by fast S3 storage:

  1. Restores for disaster recovery
  2. Creation of realistic dev/test tables based on backups
  3. Analytics and machine learning on the backup copy

In other words, after making one copy of a dataset or table, you now have three different ways to leverage that dataset, possibly all at the same time, building on the same tool you can use for large-scale data warehousing.

The flow of backup and possible restorations is illustrated in the figure below:

presto

While disaster recovery workflows are possible, this is not the recommended primary means of protecting your databases. Instead, this is an additional layer of a data protection strategy that can both help in certain DR scenarios but also enable new use-cases with your backups.

The primary database can be a wide variety of technologies; PrestoSql has a wide set of connectors for querying data on different types of storage systems.

Most connectors are for database-like systems that in turn store data on PersistentVolumes, backed by a CSI provisioner such as PSO. Examples include Cassandra, Elasticsearch, Kafka, MongoDB, Mysql, Oracle, Postgresql, and SQL Server.

The Hive connector is unique: it allows Presto to directly query tables stored on an open S3 object store “data lake” such as FlashBlade. The Hive connector only uses a Hive Metastore for keeping metadata about tables on any compatible data lake. Storing backup copies of tables on FlashBlade S3 through the Hive connector has the advantage of performance for rapid restores and the flexibility of allowing other tools, e.g., Apache Spark, to also read the table data from S3.

Using multiple connectors in the same PrestoSql instance crucially enables you to create queries that combine data from different sources or that read from one source and write to another. For example, a CTAS statement can create a table using the Hive connector as destination and the Postgresql connector as source. This CTAS statement effectively creates a backup copy.

Therefore, the cluster architecture I describe includes three layers as illustrated in the figure below:

  1. The PrestoSql service coordinated by Starburst
  2. Data services managing datasets, tables, and schemas, accessed through PrestoSql connectors for Postgresql and Hive
  3. Reliable data storage, block/filesystem/object, provided by Pure

starburst

For the examples here, I use Postgresql as the primary database with the underlying PersistentVolume provided by FlashArray via PSO.

How to Connect Starburst PrestoSql and FlashBlade

Installing the Starburst Presto operator is straightforward following their instructions.

The next step is to create a Presto cluster with the necessary configuration to connect to your data stores. For FlashBlade, we leverage the Hive connector to access and store tables on S3.

The below is an example of my configuration for Starburst and FlashBlade.

hive:

 awsSecretName: presto-hive-s3-keys

 internalMetastore:

   image:

     pullPolicy: Always

   s3Endpoint: “https://10.62.64.200

   internalPostgreSql:

     enabled: true

     storage:

       className: pure-file

   memory: 0.5Gi

   cpu: 0.5

 additionalProperties: |

   hive.allow-drop-table=true

   hive.s3.path-style-access=true

   hive.s3.ssl.enabled=false

   hive.s3.endpoint=10.62.64.200

The data VIP of the FlashBlade (“10.62.64.200” in my case) is specified for s3Endpoint as well as in the additional properties section for hive.s3.endpoint. The endpoint is configured in two places; one for the hive metastore, which issues metadata-only operations, and the second for the Presto workers that use their own S3 data paths for reading and writing data.

The access keys for S3 access are encoded in the secret “awsSecretName.” I created this secret for S3 keys as follows:

kubectl create secret generic presto-hive-s3-keys --from-literal=AWS_ACCESS_KEY_ID="$ACCESS" --from-literal=AWS_SECRET_ACCESS_KEY="$SECRET"

This configuration uses an internal Hive metastore service backed by a Postgresql database; here internal means that the Starburst operator manages it for us. In order to use FlashBlade for the Postgresql instance storage, I customize by adding the className as “pure-file.” Note that this is a wholly separate Postgresql instance as the primary database with user data.

As part of the Hive configuration above, the specific part that corresponds to the internal Postgresql instance that uses PSO for storage is:

internalPostgreSql:

 enabled: true

 storage:

   className: pure-file

The Starburst operator dynamically adapts the Presto cluster based on changes to the yaml specification:

  • If I modify the yaml to change the default database used by the Postgresql connector, the starburst cluster will restart automatically and reconfigure.
  • If I increase the node count, the operator automatically scales out the workers.

To verify connectivity and start working with data, I create a Presto CLI instance to connect and start issuing queries with the following command:

> COORD=example-presto  # my cluster name

> kubectl run presto-cli --rm -i --tty --image starburstdata/presto --presto-cli --server https://$COORD.default.svc.cluster.local:8080

I access the Presto administrative UI using port-forwarding, but the default behavior is to create a NodePort service that allows accessing the UI from any node in the Kubernetes cluster.

> kubectl port-forward --address 0.0.0.0 service/example-presto 8080

Now, create an external table based on data already on S3 and start with some SQL queries:

> CREATE TABLE IF NOT EXISTS hive.default.reddit (name varchar, body varchar) WITH (format='json', external_location='s3a://joshuarobinson/reddit.json');

> SELECT COUNT(*) FROM hive.default.reddit;

Presto can query existing datasets on S3 using external tables, such as the JSON data in bucket ‘joshuarobinson’ above, while also using S3 for storage of internal, Presto-managed tables.

How to Monitor with Prometheus

The Starburst operator includes built-in support for monitoring Presto using Prometheus, fitting easily into Kubernetes-based monitoring configurations. Using a standard monitoring tool like Prometheus allows you to also include FlashArrays and FlashBlades with the Pure Exporter.

See my previous blog for a specific example yaml file for how to monitor Starburst and FlashBlade together.

WorkFlows with Rapid Restore

This section shows examples of how to orchestrate SQL-based workflows based on rapid restore scenarios.

Preliminaries: The Backup Copy

The first step is to use PrestoSql to orchestrate taking periodic backups of a database. In all of these examples, I use the Hive connector and S3 as the storage location for the table backups.

First create a destination schema to store the backups and specify the storage location using an s3a address:

presto> CREATE SCHEMA IF NOT EXISTS hive.ph WITH (location = 's3a://joshuarobinson/warehouse/ph');

The s3a address is specified here because this is how the Hive metastore accesses S3, using the S3a adaptor from Hadoop. Note that PrestoSql reads and writes to S3 using a newer, more efficient S3 layer. The above command also assumes the bucket (“joshuarobinson”) already exists, which can be done through the FlashBlade UI or external tools like s5cmd.

You can now create a table backup using straightforward CTAS statements in Presto. In the two examples below, note that the destination table and the source tables are in different database connectors, giving you the ability to backup from a relational database (Postgresql here) to a Parquet table on S3.

presto> CREATE TABLE hive.ph.ph_logs WITH (format='parquet') AS SELECT * FROM postgresql.internal.ph_logs;

presto> CREATE TABLE hive.ph.ph_logs WITH (format='parquet', bucketed_by=ARRAY['id'], bucket_count=50) AS SELECT * FROM postgresql.internal.ph_logs;

Usage 1: Basic Restore

As a first step, I can reverse the original backup and re-create my table in the postgresql instance as a CTAS from the Parquet data stored on S3.

presto> CREATE TABLE postgresql.restored.ph_logs AS SELECT * FROM hive.ph.ph_logs;

CREATE TABLE: 32170139 rows

Query 20200501_160205_00202_si7m6, FINISHED, 6 nodes

Splits: 106 total, 106 done (100.00%)

6:04 [32.2M rows, 97.4MB] [88.5K rows/s, 274KB/s]

A restore like this is useful as part of a “defense in depth” approach to disaster recovery circumstances; human error being the most common type of disaster.

Usage 2a: Cloning a Table for Dev/Test Usage

In this example, we will recreate the table in a different, test database instance named “postgresql-dev.” This will create a separate copy of the database suitable for testing new application versions.

presto> CREATE TABLE “postgresql-dev”.public.atop_mem AS SELECT * FROM hive.pgs.atop_mem

The following screenshot from the FlashBlade UI shows how quickly my example 2GB table can be cloned. The data is immediately read in from FlashBlade S3 to Presto and then the bottleneck is how quickly the destination database can be updated. Automation around dev/test workflows requires clones to complete seconds, not hours, so that test feedback is responsive and therefore useful.

presto

Usage 2b: Secure Downgrade

Taking the dev/test workflow further, the restoration of the table can also involve useful transformations!

Creating databases for test purposes often requires an extra step to sufficiently anonymize sensitive data, i.e. a “secure downgrade.” This is necessary because testing is often done in a different security environment and test code might introduce security bugs.

CREATE TABLE “postgresql-dev”.public.customer_logs AS

SELECT to_hex(md5(to_utf8(customer))) AS customer_anon,

   path,

   filename

FROM hive.pgs.ph_logs

The query above creates a new table using a transformation; the path and filename columns are copied as-is but the “customer” name is transformed using the “md5” SQL function. The resulting table contains a hashed string as the anonymized customer name, more suitable for working in a dev/test environment where security controls are necessarily less strict.

This is a simple example that illustrates the ability to transform the source data as part of the “clone” operation to make it more suitable for test usage. Prestosql enables you to use any standard SQL functionality to apply transformation during the table clone or restore.

Usage 3a: Reports on the “Backup”

Now you can build dashboards using Presto SQL queries against the tables on S3. Using standard SQL, these can be the same reporting queries you were using before, just targeting the tables stored on S3.

An example SQL query to create a summary report in one of my tables:

presto> SELECT customer,system,hostname,AVG(ticks_idle / (ticks_per_sec * cpu_count)) as busy FROM hive.pgs.atop_cpu GROUP BY customer,system,hostname ORDER BY busy DESC LIMIT 10;

The advantage of this approach is that you can offload queries from running on your primary database where they can negatively impact performance. You can also leverage columnar encoding formats like Parquet for far faster analytics queries.

Usage 3b: Machine Learning and Beyond

But there are many types of analytics not well-suited for SQL, like machine learning based on your table data. Conveniently, Presto stores tables in open formats like Parquet such that tools like Apache Spark can easily read and access that data on FlashBlade.

Apache Spark works well with Presto tables on S3 because Spark easily loads the data with schema included!

tablepath="s3a://joshuarobinson/warehouse/pgs/atop_cpu/"

tb = spark.read.parquet(tablepath)

tb.printSchema()

root

| — customer: string (nullable = true)

| — system: string (nullable = true)

| — hostname: string (nullable = true)

| — epoch: long (nullable = true)

Now that a dataframe is loaded, I construct a machine learning pipeline that uses a random forest classifier to try and make sense of my data.

# First, simple transformations of the data

tb = tb.withColumn(“total_ticks”, col(“cpu_count”) * col(“ticks_per_sec”) * col(“interval”))

tb_norm = tb.withColumn(“pct_system”, col(“ticks_system”) / col(“total_ticks”)) \

 .withColumn(“pct_user”, col(“ticks_user”) / col(“total_ticks”)) \

 .withColumn(“pct_user_nice”, col(“ticks_user_nice”) / col(“total_ticks”)) \

 .withColumn(“pct_wait”, col(“ticks_wait”) / col(“total_ticks”)) \

 .withColumn(“pct_irq”, col(“ticks_irq”) / col(“total_ticks”)) \

 .withColumn(“pct_irqsoft”, col(“ticks_irqsoft”) / col(“total_ticks”)) \

 .withColumn(“pct_guest”, col(“ticks_guest”) / col(“total_ticks”))

# Create the feature vector and labels and the ML training pipeline.

labelIndexer = pyspark.ml.feature.StringIndexer(inputCol=”customer”, outputCol=”indexedLabel”).fit(tb_norm)

assembler = pyspark.ml.feature.VectorAssembler(

inputCols=[“pct_system”, “pct_user”, “pct_user_nice”, “pct_wait”, “pct_irq”, “pct_irqsoft”, “pct_guest”],

outputCol=”features”)

rf = pyspark.ml.classification.RandomForestClassifier(labelCol=”indexedLabel”, featuresCol=”features”, numTrees=10)

labelConverter = pyspark.ml.feature.IndexToString(inputCol=”prediction”, outputCol=”predictedLabel”, labels=labelIndexer.labels)

pipeline = pyspark.ml.Pipeline(stages=[labelIndexer, assembler, rf, labelConverter])

model = pipeline.fit(trainingData)

predictions = model.transform(testData)

This is a simple example using SparkML, but note that using data from a Presto table is straightforward: the schema is automatically detected and then all SparkML code proceeds as normal with no extra hooks or boilerplate needed. Sharing the data between Presto and SparkML is trivial because of open data formats like Parquet and a flexible data hub built on FlashBlade S3.

Summary

Pure FlashBlade and Starburst PrestoSql provide a modern analytics infrastructure uniquely capable of spanning multiple databases and data lakes. Because of their flexibility and performance, making a backup copy of a table leads to multiple ways to leverage rapid restores. A single data table on FlashBlade S3 can power 1) standard disaster recovery, 2) creation of dev/test instances, and 3) advanced analytics and machine learning.