상태 저장 애플리케이션 예제

이 문서에는 사용자 지정 상태 저장 애플리케이션에 대한 코드 예제가 포함되어 있습니다. Databricks는 집계 및 조인과 같은 일반적인 작업에 기본 제공 상태 저장 메서드를 사용하는 것이 좋습니다.

이 문서의 패턴은 Databricks Runtime 16.2 이상에서 공개 미리 보기 사용할 수 있는 transformWithState 연산자 및 관련 클래스를 사용합니다. 사용자 지정 상태 저장 애플리케이션빌드를 참조하세요.


Python은 transformWithStateInPandas 연산자를 사용하여 동일한 기능을 제공합니다. 아래 예제에서는 Python 및 Scala에서 코드를 제공합니다.

요구 사항

transformWithState 연산자와 관련 API 및 클래스에는 다음과 같은 요구 사항이 있습니다.

  • Databricks Runtime 16.2 이상에서 사용할 수 있습니다.
  • 컴퓨팅은 전용 또는 격리되지 않은 액세스 모드를 사용해야 합니다.
  • RocksDB 상태 저장소 공급자를 사용해야 합니다. Databricks는 컴퓨팅 구성의 일부로 RocksDB를 사용하도록 설정하는 것이 좋습니다.


현재 세션에 대해 RocksDB 상태 저장소 공급자를 사용하도록 설정하려면 다음을 실행합니다.

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

느리게 변화하는 차원(SCD) 유형 1

다음 코드는 transformWithState사용하여 SCD 형식 1을 구현하는 예제입니다. SCD 형식 1은 지정된 필드에 대한 최신 값만 추적합니다.


스트리밍 테이블 및 APPLY CHANGES INTO 사용하여 Delta Lake 지원 테이블을 사용하여 SCD 형식 1 또는 형식 2를 구현할 수 있습니다. 이 예제에서는 상태 저장소에서 SCD 형식 1을 구현하여 거의 실시간 애플리케이션에 대한 대기 시간을 낮춥니다.


# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator

# Set the state store provider to RocksDB
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

# Define the output schema for the streaming query
output_schema = StructType([
    StructField("user", StringType(), True),
    StructField("time", LongType(), True),
    StructField("location", StringType(), True)

# Define a custom StatefulProcessor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define the schema for the state value
        value_state_schema = StructType([
            StructField("user", StringType(), True),
            StructField("time", LongType(), True),
            StructField("location", StringType(), True)
        # Initialize the state to store the latest location for each user
        self.latest_location = handle.getValueState("latestLocation", value_state_schema)

    def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]:
        # Find the row with the maximum time value
        max_row = None
        max_time = float('-inf')
        for pdf in rows:
            for _, pd_row in pdf.iterrows():
                time_value = pd_row["time"]
                if time_value > max_time:
                    max_time = time_value
                    max_row = tuple(pd_row)

        # Check if state exists and update if necessary
        exists = self.latest_location.exists()
        if not exists or max_row[1] > self.latest_location.get()[1]:
            # Update the state with the new max row
            # Yield the updated row
            yield pd.DataFrame(
                {"user": (max_row[0],), "time": (max_row[1],), "location": (max_row[2],)}
        # Yield an empty DataFrame if no update is needed
        yield pd.DataFrame()

    def close(self) -> None:
        # No cleanup needed

# Apply the stateful transformation to the input DataFrame
  .writeStream...  # Continue with stream writing configuration


// Define a case class to represent user location data
case class UserLocation(
    user: String,
    time: Long,
    location: String)

// Define a stateful processor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor extends StatefulProcessor[String, UserLocation, UserLocation] {
  import org.apache.spark.sql.{Encoders}

  // Transient value state to store the latest location for each user
  @transient private var _latestLocation: ValueState[UserLocation] = _

  // Initialize the state store
  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    // Create a value state named "locationState" using UserLocation encoder
    // TTLConfig.NONE means the state has no expiration
    _latestLocation = getHandle.getValueState[UserLocation]("locationState",
      Encoders.product[UserLocation], TTLConfig.NONE)

  // Process input rows and update state
  override def handleInputRows(
      key: String,
      inputRows: Iterator[UserLocation],
      timerValues: TimerValues): Iterator[UserLocation] = {
    // Find the location with the maximum timestamp from input rows
    val maxNewLocation = inputRows.maxBy(_.time)

    // Update state and emit output if:
    // 1. No previous state exists, or
    // 2. New location has a more recent timestamp than the stored one
    if (_latestLocation.getOption().isEmpty || maxNewLocation.time > _latestLocation.get().time) {
      Iterator.single(maxNewLocation)  // Emit the updated location
    } else {
      Iterator.empty  // No update needed, emit nothing

느린 변경 차원(SCD) 유형 2

다음 Notebook에는 Python 또는 Scala에서 transformWithState 사용하여 SCD 형식 2를 구현하는 예제가 포함되어 있습니다.

SCD 유형 2 Python

SCD 유형 2 Scala

가동 중지 시간 탐지기

transformWithState 지정된 키에 대한 레코드가 마이크로배치에서 처리되지 않더라도 경과된 시간에 따라 작업을 수행할 수 있도록 타이머를 구현합니다.

다음 예제에서는 가동 중지 시간 탐지기에 대한 패턴을 구현합니다. 지정된 키에 대해 새 값이 표시될 때마다 lastSeen 상태 값을 업데이트하고, 기존 타이머를 지우고, 미래를 위해 타이머를 다시 설정합니다.

타이머가 만료되면 애플리케이션은 키에 대해 마지막으로 관찰된 이벤트 이후 경과된 시간을 내보낸다. 그런 다음 10초 후에 업데이트를 내보내도록 새 타이머를 설정합니다.


import datetime
import time

class DownTimeDetectorStatefulProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define schema for the state value (timestamp)
        state_schema = StructType([StructField("value", TimestampType(), True)])
        self.handle = handle
        # Initialize state to store the last seen timestamp for each key
        self.last_seen = handle.getValueState("last_seen", state_schema)

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        latest_from_existing = self.last_seen.get()
        # Calculate downtime duration
        downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(time.time() * 1000)
        # Register a new timer for 10 seconds in the future
        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
        # Yield a DataFrame with the key and downtime duration
        yield pd.DataFrame(
                "id": key,
                "timeValues": str(downtime_duration),

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Find the row with the maximum timestamp
        max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])

        # Get the latest timestamp from existing state or use epoch start if not exists
        if self.last_seen.exists():
            latest_from_existing = self.last_seen.get()
            latest_from_existing = datetime.fromtimestamp(0)

        # If new data is more recent than existing state
        if latest_from_existing < max_row[1]:
            # Delete all existing timers
            for timer in self.handle.listTimers():
            # Update the last seen timestamp

        # Register a new timer for 5 seconds in the future
        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)

        # Get current processing time in milliseconds
        timestamp_in_millis = str(timerValues.getCurrentProcessingTimeInMs())

        # Yield a DataFrame with the key and current timestamp
        yield pd.DataFrame({"id": key, "timeValues": timestamp_in_millis})

    def close(self) -> None:
        # No cleanup needed


import java.sql.Timestamp
import org.apache.spark.sql.Encoders

// The (String, Timestamp) schema represents an (id, time). We want to do downtime
// detection on every single unique sensor, where each sensor has a sensor ID.
class DowntimeDetector(duration: Duration) extends
  StatefulProcessor[String, (String, Timestamp), (String, Duration)] {

  @transient private var _lastSeen: ValueState[Timestamp] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    _lastSeen = getHandle.getValueState[Timestamp]("lastSeen", Encoders.TIMESTAMP, TTLConfig.NONE)

  // The logic here is as follows: find the largest timestamp seen so far. Set a timer for
  // the duration later.
  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Timestamp)],
      timerValues: TimerValues): Iterator[(String, Duration)] = {
    val latestRecordFromNewRows = inputRows.maxBy(_._2.getTime)

    // Use getOrElse to initiate state variable if it doesn't exist
    val latestTimestampFromExistingRows = _lastSeen.getOption().getOrElse(new Timestamp(0))
    val latestTimestampFromNewRows = latestRecordFromNewRows._2

    if (latestTimestampFromNewRows.after(latestTimestampFromExistingRows)) {
      // Cancel the one existing timer, since we have a new latest timestamp.
      // We call "listTimers()" just because we don't know ahead of time what
      // the timestamp of the existing timer is.
      getHandle.listTimers().foreach(timer => getHandle.deleteTimer(timer))

      // Use timerValues to schedule a timer using processing time.
      getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + duration.toMillis)
    } else {
      // No new latest timestamp, so no need to update state or set a timer.


  override def handleExpiredTimer(
    key: String,
    timerValues: TimerValues,
    expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, Duration)] = {
      val latestTimestamp = _lastSeen.get()
      val downtimeDuration = new Duration(
        timerValues.getCurrentProcessingTimeInMs() - latestTimestamp.getTime)

      // Register another timer that will fire in 10 seconds.
      // Timers can be registered anywhere but init()
      getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)

      Iterator((key, downtimeDuration))

기존 상태 정보 마이그레이션

다음 예제에서는 초기 상태를 허용하는 상태 저장 애플리케이션을 구현하는 방법을 보여 줍니다. 상태 저장 애플리케이션에 초기 상태 처리를 추가할 수 있지만 초기 상태는 애플리케이션을 처음 초기화할 때만 설정할 수 있습니다.

이 예제에서는 statestore 판독기를 사용하여 검사점 경로에서 기존 상태 정보를 로드합니다. 이 패턴의 사용 사례의 하나는 레거시 상태 저장 애플리케이션에서 transformWithState으로 마이그레이션하는 것입니다.


# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType
from typing import Iterator

# Set RocksDB as the state store provider for better performance
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Input schema is as below

input_schema = StructType(
    [StructField("id", StringType(), True)],
    [StructField("value", StringType(), True)]

# Define the output schema for the streaming query
output_schema = StructType([
    StructField("id", StringType(), True),
    StructField("accumulated", StringType(), True)

class AccumulatedCounterStatefulProcessorWithInitialState(StatefulProcessor):

    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define schema for the state value (integer)
        state_schema = StructType([StructField("value", IntegerType(), True)])
        # Initialize state to store the accumulated counter for each id
        self.counter_state = handle.getValueState("counter_state", state_schema)
        self.handle = handle

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Check if state exists for the current key
        exists = self.counter_state.exists()
        if exists:
            value_row = self.counter_state.get()
            existing_value = value_row[0]
            existing_value = 0

        accumulated_value = existing_value

        # Process input rows and accumulate values
        for pdf in rows:
            value = pdf["value"].astype(int).sum()
            accumulated_value += value

        # Update the state with the new accumulated value

        # Yield a DataFrame with the key and accumulated value
        yield pd.DataFrame({"id": key, "accumulated": str(accumulated_value)})

    def handleInitialState(self, key, initialState, timerValues) -> None:
        # Initialize the state with the provided initial value
        init_val = initialState.at[0, "initVal"]

    def close(self) -> None:
        # No cleanup needed

# Load initial state from a checkpoint directory
initial_state = spark.read.format("statestore")
  .option("path", "$checkpointsDir")

# Apply the stateful transformation to the input DataFrame
  .writeStream...  # Continue with stream writing configuration


// Import necessary libraries
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._

// Define a stateful processor that can handle initial state
class InitialStateStatefulProcessor extends StatefulProcessorWithInitialState[String, (String, String, String), (String, String), (String, Int)] {
  // Transient value state to store the accumulated value
  @transient protected var valueState: ValueState[Int] = _

  // Initialize the state store
  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    // Create a value state named "valueState" using Int encoder
    // TTLConfig.NONE means the state has no automatic expiration
    valueState = getHandle.getValueState[Int]("valueState",
      Encoders.scalaInt, TTLConfig.NONE)

  // Process input rows and update state
  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, String, String)],
      timerValues: TimerValues): Iterator[(String, String)] = {
    var existingValue = 0
    // Retrieve existing value from state if it exists
    if (valueState.exists()) {
      existingValue += valueState.get()
    var accumulatedValue = existingValue
    // Accumulate values from input rows
    for (row <- inputRows) {
      accumulatedValue += row._2.toInt
    // Update the state with the new accumulated value
    // Return the key and accumulated value as a string
    Iterator((key, accumulatedValue.toString))

  // Handle initial state when provided
  override def handleInitialState(
      key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
    // Update the state with the initial value

초기화를 위해 델타 테이블을 상태 저장소로 마이그레이션

다음 Notebook에는 Python 또는 Scala에서 transformWithState 사용하여 Delta 테이블에서 상태 저장소 값을 초기화하는 예제가 포함되어 있습니다.

Delta Python에서 상태 초기화

Delta Scala에서 상태 초기화

세션 추적

다음 Notebook에는 Python 또는 Scala에서 transformWithState 사용하여 세션 추적의 예가 포함되어 있습니다.

세션 추적 Python

Scala 세션 추적

transformWithState을 사용하여 맞춤형 스트림-스트림 조인

다음 코드는 transformWithState을 사용하여 여러 스트림 간의 맞춤형 스트림 조인을 보여 줍니다. 다음과 같은 이유로 기본 제공 조인 연산자 대신 이 방법을 사용할 수 있습니다.

  • 스트림 스트림 조인을 지원하지 않는 업데이트 출력 모드를 사용해야 합니다. 이는 짧은 대기 시간 애플리케이션에 특히 유용합니다.
  • 워터마크가 만료된 후 늦게 도착하는 행에 대한 조인을 계속 수행해야 합니다.
  • 여러 스트림 간의 다대다 조인을 수행해야 합니다.

이 예제에서는 사용자에게 상태 만료 논리에 대한 모든 권한을 부여하여 동적 보존 기간 확장이 워터마크 이후에도 순서가 다른 이벤트를 처리할 수 있도록 합니다.


# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from typing import Iterator

# Define output schema for the joined data
output_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("profile_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("preferred_category", StringType(), True)

class CustomStreamJoinProcessor(StatefulProcessor):
    # Initialize stateful storage for user profiles, preferences, and event tracking.
    def init(self, handle: StatefulProcessorHandle) -> None:

        # Define schemas for different types of state data
        profile_schema = StructType([
            StructField("name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("updated_at", TimestampType(), True)
        preferences_schema = StructType([
            StructField("preferred_category", StringType(), True),
            StructField("updated_at", TimestampType(), True)
        activity_schema = StructType([
            StructField("event_type", StringType(), True),
            StructField("timestamp", TimestampType(), True)

        # Initialize state storage for user profiles, preferences, and activity
        self.profile_state = handle.getMapState("user_profiles", "string", profile_schema)
        self.preferences_state = handle.getMapState("user_preferences", "string", preferences_schema)
        self.activity_state = handle.getMapState("user_activity", "string", activity_schema)

    # Process incoming events and update state
    def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]:
        df = pd.concat(rows, ignore_index=True)
        output_rows = []

        for _, row in df.iterrows():
            user_id = row["user_id"]

            if "event_type" in row:  # User activity event
                self.activity_state.update_value(user_id, row.to_dict())
                # Set a timer to process this event after a 10-second delay
                self.getHandle().registerTimer(timer_values.get_current_processing_time_in_ms() + (10 * 1000))

            elif "name" in row:  # Profile update
                self.profile_state.update_value(user_id, row.to_dict())

            elif "preferred_category" in row:  # Preference update
                self.preferences_state.update_value(user_id, row.to_dict())

        # No immediate output; processing will happen when timer expires
        return iter([])

    # Perform lookup after delay, handling out-of-order and late-arriving events.
    def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> Iterator[pd.DataFrame]:

        # Retrieve stored state for the user
        user_activity = self.activity_state.get_value(key)
        user_profile = self.profile_state.get_value(key)
        user_preferences = self.preferences_state.get_value(key)

        if user_activity:
            # Combine data from different states into a single output row
            output_row = {
                "user_id": key,
                "event_type": user_activity["event_type"],
                "timestamp": user_activity["timestamp"],
                "profile_name": user_profile.get("name") if user_profile else None,
                "email": user_profile.get("email") if user_profile else None,
                "preferred_category": user_preferences.get("preferred_category") if user_preferences else None
            return iter([pd.DataFrame([output_row])])

        return iter([])

    def close(self) -> None:
        # No cleanup needed

# Apply transformWithState to the input DataFrame
  .writeStream...  # Continue with stream writing configuration


// Import necessary libraries
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.TimestampType
import java.sql.Timestamp

// Define a case class for enriched user events, combining user activity with profile and preference data
case class EnrichedUserEvent(
    user_id: String,
    event_type: String,
    timestamp: Timestamp,
    profile_name: Option[String],
    email: Option[String],
    preferred_category: Option[String]

// Custom stateful processor for stream-stream join
class CustomStreamJoinProcessor extends StatefulProcessor[String, UserEvent, EnrichedUserEvent] {
  // Transient state variables to store user profiles, preferences, and activities
  @transient private var _profileState: MapState[String, UserProfile] = _
  @transient private var _preferencesState: MapState[String, UserPreferences] = _
  @transient private var _activityState: MapState[String, UserEvent] = _

  // Initialize state stores
  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    _profileState = getHandle.getMapState[String, UserProfile]("profileState", Encoders.product[UserProfile], TTLConfig.NONE)
    _preferencesState = getHandle.getMapState[String, UserPreferences]("preferencesState", Encoders.product[UserPreferences], TTLConfig.NONE)
    _activityState = getHandle.getMapState[String, UserEvent]("activityState", Encoders.product[UserEvent], TTLConfig.NONE)

  // Handle incoming user events
  override def handleInputRows(
      key: String,
      inputRows: Iterator[UserEvent],
      timerValues: TimerValues): Iterator[EnrichedUserEvent] = {

    inputRows.foreach { event =>
      if (event.event_type.nonEmpty) {
        // Update activity state and set a timer for 10 seconds in the future
        _activityState.update(key, event)
        getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)

  // Handle expired timers to produce enriched events
  override def handleExpiredTimer(
      key: String,
      timerValues: TimerValues,
      expiredTimerInfo: ExpiredTimerInfo): Iterator[EnrichedUserEvent] = {

    // Retrieve user data from state stores
    val userEvent = _activityState.getOption(key)
    val userProfile = _profileState.getOption(key)
    val userPreferences = _preferencesState.getOption(key)

    if (userEvent.isDefined) {
      // Create and return an enriched event if user activity exists
      val enrichedEvent = EnrichedUserEvent(
        user_id = key,
        event_type = userEvent.get.event_type,
        timestamp = userEvent.get.timestamp,
        profile_name = userProfile.map(_.name),
        email = userProfile.map(_.email),
        preferred_category = userPreferences.map(_.preferred_category)
    } else {

// Apply the custom stateful processor to the input DataFrame
val enrichedStream = df
    new CustomStreamJoinProcessor(),

// Write the enriched stream to Delta Lake
  .option("checkpointLocation", "/mnt/delta/checkpoints")

Top-K 계산

다음 예제에서는 우선 순위 큐가 있는 ListState 사용하여 각 그룹 키에 대한 스트림의 상위 K 요소를 거의 실시간으로 유지 관리하고 업데이트합니다.

Top-K Python

Top-K 스칼라

