{
  "id": "streaming",
  "title": "Streaming features in Featureform",
  "url": "https://redis.io/docs/latest/develop/ai/featureform/streaming/",
  "summary": "Build stream-backed features with Kafka, streaming transformations, and Redis serving.",
  "tags": [],
  "last_updated": "2026-04-09T10:29:34-04:00",
  "page_type": "content",
  "content_hash": "eab723a4b29af2d8ec1b46f3a17d554743847c364c7bf60e1253f211fec411a2",
  "sections": [
    {
      "id": "overview",
      "title": "Overview",
      "role": "overview",
      "text": "Featureform supports stream-backed feature workflows for use cases that need continuously updated online values while preserving historical correctness for training.\n\nThis page focuses on the documented SDK surface: register Kafka, define a streaming transformation, define features with `from_stream(...)`, optionally backfill from a batch source, materialize a feature view, and serve it from Redis."
    },
    {
      "id": "prerequisites",
      "title": "Prerequisites",
      "role": "content",
      "text": "Before you build streaming features, you need:\n\n- a running Featureform deployment\n- a registered Redis online store\n- a stream-capable compute provider such as Spark or Databricks\n- a Kafka topic that contains the events you want to transform into features"
    },
    {
      "id": "register-kafka",
      "title": "Register Kafka",
      "role": "content",
      "text": "[code example]"
    },
    {
      "id": "register-a-kafka-topic",
      "title": "Register a Kafka topic",
      "role": "content",
      "text": "[code example]"
    },
    {
      "id": "define-a-streaming-transformation",
      "title": "Define a streaming transformation",
      "role": "content",
      "text": "[code example]"
    },
    {
      "id": "define-features-from-a-stream",
      "title": "Define features from a stream",
      "role": "content",
      "text": "[code example]"
    },
    {
      "id": "backfill-from-batch-history",
      "title": "Backfill from batch history",
      "role": "content",
      "text": "If the stream is the online source of truth but you already have historical data in batch storage, backfill the stream-backed feature:\n\n[code example]"
    },
    {
      "id": "register-metadata-and-materialize",
      "title": "Register metadata and materialize",
      "role": "content",
      "text": "[code example]"
    },
    {
      "id": "serve-streaming-feature-values",
      "title": "Serve streaming feature values",
      "role": "content",
      "text": "[code example]"
    },
    {
      "id": "guidance",
      "title": "Guidance",
      "role": "content",
      "text": "- Parse and cast stream payloads in the transformation layer.\n- Reuse one transformed stream for multiple features.\n- Keep backfill semantics aligned with the stream definition so training and serving stay consistent.\n- Avoid undocumented parameters or status names in production documentation and examples."
    }
  ],
  "examples": [
    {
      "id": "register-kafka-ex0",
      "language": "python",
      "code": "from featureform.config.file_stores import KafkaConfig\n\nkafka = ff.register_kafka(\n    name=\"transactions-kafka\",\n    kafka_config=KafkaConfig(\n        bootstrap_servers=[\"kafka-1:9092\", \"kafka-2:9092\"],\n        use_msk_iam_auth=False,\n        options={\n            \"kafka.security.protocol\": \"SASL_SSL\",\n            \"kafka.sasl.mechanism\": \"SCRAM-SHA-512\",\n        },\n    ),\n)",
      "section_id": "register-kafka"
    },
    {
      "id": "register-a-kafka-topic-ex0",
      "language": "python",
      "code": "transactions_stream = kafka.register_kafka_topic(\n    name=\"transactions-stream\",\n    topic=\"transactions\",\n    description=\"Raw transaction events from Kafka\",\n)",
      "section_id": "register-a-kafka-topic"
    },
    {
      "id": "define-a-streaming-transformation-ex0",
      "language": "python",
      "code": "@spark.streaming_sql_transformation(\n    name=\"parsed_transactions\",\n    inputs=[transactions_stream],\n)\ndef parsed_transactions(source):\n    return \"\"\"\n        SELECT\n            get_json_object(CAST(value AS STRING), '$.user_id') AS user_id,\n            CAST(get_json_object(CAST(value AS STRING), '$.amount') AS DOUBLE) AS amount,\n            CAST(get_json_object(CAST(value AS STRING), '$.event_time') AS TIMESTAMP) AS event_time\n        FROM {{ source }}\n    \"\"\"",
      "section_id": "define-a-streaming-transformation"
    },
    {
      "id": "define-features-from-a-stream-ex0",
      "language": "python",
      "code": "from datetime import timedelta\n\nwindow = timedelta(days=7)\n\n@ff.entity\nclass User:\n    rolling_amount = (\n        ff.Feature()\n        .from_stream(\n            parsed_transactions,\n            entity=\"user_id\",\n            values=\"amount\",\n            timestamp=\"event_time\",\n        )\n        .aggregate(\n            function=ff.AggregateFunction.SUM,\n            windows=[window],\n        )\n    )",
      "section_id": "define-features-from-a-stream"
    },
    {
      "id": "backfill-from-batch-history-ex0",
      "language": "python",
      "code": "historical_transactions = spark.register_delta_table(\n    name=\"historical_transactions\",\n    database=\"ml_catalog.featureform\",\n    table=\"historical_transactions\",\n)\n\n@ff.entity\nclass User:\n    rolling_amount = (\n        ff.Feature()\n        .from_stream(\n            parsed_transactions,\n            entity=\"user_id\",\n            values=\"amount\",\n            timestamp=\"event_time\",\n        )\n        .backfill_from(\n            source=historical_transactions,\n            entity=\"user_id\",\n            values=\"amount\",\n            timestamp=\"event_time\",\n        )\n        .aggregate(\n            function=ff.AggregateFunction.SUM,\n            windows=[window],\n        )\n    )",
      "section_id": "backfill-from-batch-history"
    },
    {
      "id": "register-metadata-and-materialize-ex0",
      "language": "python",
      "code": "client.apply()\n\nclient.materialize_feature_view(\n    view_name=\"user_streaming_features\",\n    inference_store=redis,\n    features=[User.rolling_amount[window]],\n)",
      "section_id": "register-metadata-and-materialize"
    },
    {
      "id": "serve-streaming-feature-values-ex0",
      "language": "python",
      "code": "result = client.serve_feature_view(\n    view=\"user_streaming_features\",\n    entity_ids=[\"user_1\", \"user_2\"],\n)",
      "section_id": "serve-streaming-feature-values"
    }
  ]
}
