Skip to content
22K
Console

KinesisStream

Reference doc for the `sst.aws.KinesisStream` component.

The KinesisStream component lets you add an Amazon Kinesis Data Streams to your app.

Minimal example

sst.config.ts
const stream = new sst.aws.KinesisStream("MyStream");

Subscribe to a stream

sst.config.ts
stream.subscribe("MySubscriber", "src/subscriber.handler");

You can link the stream to other resources, like a function or your Next.js app.

sst.config.ts
new sst.aws.Nextjs("MyWeb", {
link: [stream]
});

Once linked, you can write to the stream from your function code.

app/page.tsx
import { Resource } from "sst";
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
const client = new KinesisClient();
await client.send(new PutRecordCommand({
StreamName: Resource.MyStream.name,
Data: JSON.stringify({ foo: "bar" }),
PartitionKey: "myKey",
}));

Constructor

new KinesisStream(name, args?, opts?)

Parameters

KinesisStreamArgs

transform?

Type Object

Transform how this component creates its underlying resources.

transform.stream?

Type StreamArgs | (args: StreamArgs, opts: ComponentResourceOptions, name: string) => void

Transform the Kinesis stream resource.

Properties

arn

Type Output<string>

name

Type Output<string>

nodes

Type Object

The underlying resources this component creates.

nodes.stream

Type Stream

The Amazon Kinesis Data Stream.

SDK

Use the SDK in your runtime to interact with your infrastructure.


This is accessible through the Resource object in the SDK.

  • name string

Methods

subscribe

subscribe(name, subscriber, args?)

Parameters

Returns Output<KinesisStreamLambdaSubscriber>

Subscribe to the Kinesis stream.

sst.config.ts
stream.subscribe("MySubscriber", "src/subscriber.handler");

Add a filter to the subscription.

sst.config.ts
stream.subscribe("MySubscriber", "src/subscriber.handler", {
filters: [
{
data: {
order: {
type: ["buy"],
},
},
},
],
});

Customize the subscriber function.

sst.config.ts
stream.subscribe("MySubscriber", {
handler: "src/subscriber.handler",
timeout: "60 seconds"
});

Or pass in the ARN of an existing Lambda function.

sst.config.ts
stream.subscribe("MySubscriber", "arn:aws:lambda:us-east-1:123456789012:function:my-function");

static subscribe

KinesisStream.subscribe(name, streamArn, subscriber, args?)

Parameters

  • name string

    The name of the subscriber.
  • streamArn Input<string>

    The ARN of the Kinesis Stream to subscribe to.
  • subscriber Input<string | FunctionArgs | “arn:aws:lambda:${string}”>

    The function that’ll be notified.
  • args? KinesisStreamLambdaSubscriberArgs

    Configure the subscription.

Returns Output<KinesisStreamLambdaSubscriber>

Subscribe to the Kinesis stream that was not created in your app.

For example, let’s say you have the ARN of an existing Kinesis stream.

sst.config.ts
const streamArn = "arn:aws:kinesis:us-east-1:123456789012:stream/MyStream";

You can subscribe to it by passing in the ARN.

sst.config.ts
sst.aws.KinesisStream.subscribe("MySubscriber", streamArn, "src/subscriber.handler");

Add a filter to the subscription.

sst.config.ts
sst.aws.KinesisStream.subscribe("MySubscriber", streamArn, "src/subscriber.handler", {
filters: [
{
data: {
order: {
type: ["buy"],
},
},
},
],
});

Customize the subscriber function.

sst.config.ts
sst.aws.KinesisStream.subscribe("MySubscriber", streamArn, {
handler: "src/subscriber.handler",
timeout: "60 seconds"
});

KinesisStreamLambdaSubscriberArgs

filters?

Type Input<Input<Record<string, any>>[]>

Filter the events that’ll be processed by the subscribers functions.

You can pass in up to 5 different filter policies. These will logically ORed together. Meaning that if any single policy matches, the record will be processed. Learn more about the filter rule syntax.

For example, if your Kinesis stream contains events in this JSON format.

{
record: 12345,
order: {
type: "buy",
stock: "ANYCO",
quantity: 1000
}
}

To process only those events where the type is buy.

{
filters: [
{
data: {
order: {
type: ["buy"],
},
},
},
],
}

transform?

Type Object

Transform how this component creates its underlying resources.

transform.eventSourceMapping?

Type EventSourceMappingArgs | (args: EventSourceMappingArgs, opts: ComponentResourceOptions, name: string) => void

Transform the Lambda Event Source Mapping resource.