Start by creating a Kinesis Firehose Data Stream and follow the steps that we completed in the last exercise.
We disabled data transformation using Lambda in the last exercise. This time, enable the Transform source records with AWS Lambda option.
Once enabled, create a Lambda function to do the data transformation for incoming data:
There are already some sample functions that have been provided by Amazon. You can click on Create New and it will open up the list of transformation functions provided by AWS. Let's choose General Firehose Processing:
This opens up the Lambda function window. Here, you need to provide the name of the function, along with the IAM role information:
Edit the code inline and replace the existing code with the code provided in the json2csv_transform.js file, under the code section. Keep the rest of the settings as is:
Once the Lambda function has been created, go back to the Firehose screen and configure the rest of the settings, such as the Amazon S3 bucket, which will work the same as the Firehose destination that we configured in the last exercise:
Also, once the Lambda function has been created, update the IAM role in the Firehose configuration to reflect the required access for the Lambda function:
Everything else remains the same as in the last exercise.
Send the test data from the Test with demo data section by clicking on Start sending demo data:
Go to the S3 location that we configured earlier to receive the data and you should see the data file, as shown here:
Upon downloading this data file and opening it with Notepad, you should see the data in CSV format, as shown here:
Ensure that you have Kinesis Data Analytics in working condition and that you are able to do real-time analysis, like we accomplished in the last exercise:
Create a S3 bucket and upload the ka-reference-data.json file into the bucket:
Go to the Kinesis Data Analytics application page and click on Connect reference data. Provide the bucket, S3 object, and table details, and populate the schema using schema discovery:
You will notice in the preceding screenshot that the Kinesis application will create the IAM role with required access.
Schema discovery will detect the schema for the reference data file and show you the sample data:
Click on Save and close button. You will have successfully added the referenced data:
Now, you should have the real-time streaming data and reference data available in the Kinesis Data Analytics application. The following screenshot is showing real-time streaming data: The following image is showing the added reference data:
Go to the SQL prompt and write the SQL statement to join real-time streaming data with the reference data, and out the company details whose names are provided in the reference file.
Run the following query in the SQL prompt. In this query, we are joining (left join) SOURCE_SQL_STREAM_001 with the ka_reference_data dataset and filtering where company name is not null:
CREATE STREAM "KINESIS_SQL_STREAM" (ticker_symbol VARCHAR(14), "Company_Name" varchar(30), sector VARCHAR(22), change DOUBLE, price DOUBLE); CREATE PUMP "STREAM_PUMP" AS INSERT INTO "KINESIS_SQL_STREAM" SELECT STREAM ticker_symbol, "kar"."Company", sector, change, price FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "ka_reference_data" as "kar" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "kar"."Ticker" where "kar"."Company" is not null ;
You should be able to see the output with both the ticker symbol and company name as output in real-time. It should get refreshed every few minutes:
This concludes our activity on adding reference data and using it to perform real-time data analytics on Amazon Kinesis Data Analytics.