为什么需要 StateBackend?—— 职责分离原则
我们可以用一个银行的例子来类比:
State(如ValueState,ListState) 就像是你的银行卡。AbstractKeyedStateBackend就像是银行的整个后台系统(包括总服务器、数据库、风控系统、会计系统等)。
你不能直接用一张塑料卡片去操作你的钱,你需要把卡片插入 ATM 机或交给柜员,由他们背后的银行系统来完成真正的存取款、转账等操作。
AbstractKeyedStateBackend 的存在正是为了实现这种职责分离:
State 接口的职责(银行卡):
- 定义用户交互的契约:提供一组简单、清晰的 API 给用户使用,比如
value(),update(),add(),clear()。它只关心“做什么”,不关心“怎么做”。
AbstractKeyedStateBackend 的职责(银行系统):
它是一个庞大而复杂的“状态引擎”,负责所有底层的、与具体实现相关的脏活累活。
- 生命周期管理:负责所有状态的创建、初始化和销毁 (
dispose)。 - 持久化与容错(核心):实现快照 (
snapshot) 和恢复逻辑。这是 Flink 实现 Exactly-Once 的基石。单个State对象自身无法完成复杂的分布式快照。 - 物理存储交互:它才是真正与 RocksDB、堆内存(Heap)等物理存储打交道的组件。它管理着数据库连接、Column Family、读写选项等。
- Key/Namespace 管理:管理
keySerializer,计算当前 key 属于哪个 Key Group (KeyGroupRangeAssignment),处理不同namespace下的状态隔离。 - 中央缓存与优化:如您所见,它内部有
lastName和lastState这样的缓存机制,用于优化对同一状态的连续访问。 - 应用横切关注点(AOP):它是一个中心化的工厂,可以在创建
State时,统一应用 TTL、Metrics 监控等功能。
看 getOrCreateKeyedState 这段代码,它完美地展示了 StateBackend 作为“工厂”和“管理者”的角色:
// ... existing code ...@Override@SuppressWarnings("unchecked")public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)throws Exception {
// ... existing code ...InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(executionConfig);}// 这里是关键:一个装饰器链条// Backend作为工厂,负责创建原始State,并用TTL、Metrics等功能进行包装kvState =MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, ttlTimeProvider),this,stateDescriptor,latencyTrackingStateConfig,sizeTrackingStateConfig);keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return (S) kvState;}
// ... existing code ...
结论:如果直接使用 State,就意味着每一个 State 对象都需要自己实现一套完整的快照、恢复、缓存、物理存储交互逻辑。这将导致代码极度冗余、混乱且难以维护。AbstractKeyedStateBackend 将这些公共的、复杂的底层逻辑全部收敛,使得 State 对象可以保持为一个轻量级的、只关注业务逻辑的句柄。
State 反过来引用 Backend,这并非传统意义上需要避免的耦合,而是一种委托(Delegation)。我们来梳理一下这个流程:
- 创建:
Backend创建了一个具体的State实现类(比如RocksDBValueState)。 - 持有引用:在创建
RocksDBValueState时,Backend会把自身的引用 (this) 传递给RocksDBValueState的构造函数。因此,这个State实例从诞生起就知道“是谁创造了我”、“我应该向谁汇报”。
比如update
public void update(V value) throws IOException {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (RocksDBException e) {throw new IOException("Error while adding data to RocksDB", e);}}虽然直接调用了 backend.db.put(...),但我们仔细分析一下它所需要的所有参数,就会发现委托模式的本质依然存在:
- columnFamily: 这个 ColumnFamilyHandle 是从哪里来的?它是在 RocksDBValueState 被创建时,由 backend 传入的。State 自己不管理 Column Family 的生命周期。
- writeOptions: 这个 WriteOptions 对象同样是 backend 的成员变量,由 backend 统一配置和管理。
- serializeCurrentKeyWithGroupAndNamespace(): 这是最关键的一步。这个方法内部需要:
- backend.getCurrentKey(): 获取当前正在处理的 Key。
- backend.getCurrentKeyGroupIndex(): 计算 Key Group。
- getNamespaceSerializer(): 获取 Namespace 序列化器。
- backend.getKeySerializer(): 获取 Key 序列化器。 这些核心的上下文信息和组件(序列化器),全部是由 backend 提供的。State 对象本身是无状态的(stateless in terms of context),它不知道当前在为哪个 key 工作,必须向 backend 查询。
- serializeValue(value): 这个方法内部需要 getValueSerializer(),而这个序列化器也是在创建时由 backend 提供的。
所以,即使 State 执行了最后那一下 put 操作,它也像一个“一线工人”,虽然亲手把螺丝拧上去了,但这个螺丝(value)、螺丝刀(writeOptions)、图纸(columnFamily)以及拧在哪个位置(key 和 namespace),全部是由 backend 这个“车间主任”提供的。
这是一种更细粒度的委托:State 被委托了“如何将序列化好的 key 和 value 放入指定的 Column Family”这个具体的执行逻辑,但它依然将“获取所有执行前提条件(上下文、资源、配置)”这项更重要的职责委托给了 backend。
Subtask、RocksDB 实例、窗口和 Namespace 的关系
Operator 的一个 Subtask 实例 对应一个独立的 RocksDB 实例。
让我们把这个关系链梳理清楚:
- 一个 Flink Job:可以包含多个 Operator(
map,filter,keyBy等)。 - 一个 Operator:可以有多个并行的 Subtask 实例(并行度决定)。
- 一个 Subtask 实例:运行在一个 TaskManager 的一个 Slot 中。
- 一个
RocksDBKeyedStateBackend实例:每个有状态的 Subtask 实例都会创建一个自己的RocksDBKeyedStateBackend对象。 - 一个 RocksDB 数据库实例:每个
RocksDBKeyedStateBackend都会在 TaskManager 的本地磁盘上创建一个独立的 RocksDB 数据库目录和实例(db对象)。
所以,如果你的一个 window 操作的并行度是 10,那么就会有 10 个 Subtask,对应 10 个 RocksDBKeyedStateBackend 实例,进而在不同的 TaskManager 上创建 10 个独立的 RocksDB 数据库。它们之间物理隔离,互不干扰。
那么窗口和 Namespace 是什么关系?
在一个 Subtask 内部(也就是在一个 RocksDB 实例内部),Namespace 是用来在逻辑上区分不同窗口的状态的。
mergeNamespaces 就是最好的例子。当会话窗口需要合并时:
sourcenamespaces 就是旧的、待合并的窗口的标识符。targetnamespace 就是合并后的新窗口的标识符。
这些 namespace 和用户的 key 组合在一起,构成了 RocksDB 中真正的 key。
总结:
- 物理隔离:不同的 Subtask 通过拥有各自独立的 RocksDB 实例来实现物理隔离。
- 逻辑隔离:在同一个 Subtask(同一个 RocksDB 实例)内部,不同的窗口(或其它需要隔离的场景,如
ProcessFunction中的不同 Timer)通过namespace来实现逻辑隔离。
所有 State 如何共享 DB 并互相区分?—— Column Family
在一个 RocksDBKeyedStateBackend 内部,所有不同名称的 State(比如你在一个 ProcessFunction 中定义了 ValueState<Integer>、ListState<String> 和 MapState<Long, Double>)是共享同一个 RocksDB 实例的。
那它们的数据是如何区分,不会混在一起的呢?答案是:列族(Column Family)。
Column Family 是 RocksDB 中用于隔离数据的逻辑命名空间,可以把它想象成关系型数据库中的一张张独立的表。
我们来看 RocksDBKeyedStateBackend.java 中的关键实现:
当一个 State 首次被创建时,RocksDBKeyedStateBackend 会为它做两件事:
- 创建一个新的 Column Family:每个
StateDescriptor的唯一名称(stateDesc.getName())会被用来命名一个新的 Column Family。 - 注册元信息:将这个 State 的名称、序列化器信息以及它对应的
ColumnFamilyHandle存储在一个Map中,也就是kvStateInformation。
// ... existing code .../*** Information about the k/v states, maintained in the order as we create them. This is used to* retrieve the column family that is used for a state and also for sanity checks when* restoring.*/private final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
// ... existing code ...
当具体的 State 对象(如 RocksDBValueState)执行读写操作时,它会从 backend 获取自己专属的 ColumnFamilyHandle,并将其作为参数传递给 db.get()、db.put() 或 db.merge() 等方法。
// RocksDBValueState.java 中的 value() 方法
byte[] valueBytes = backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());
这样一来,虽然所有的 State 都在同一个 db 对象上操作,但由于它们使用了不同的 columnFamily,数据就被天然地隔离在了不同的“表”里,绝不会互相干扰。
这种设计的优势是什么?
- 资源共享:所有 Column Family 共享同一个 MemTable、Write-Ahead-Log (WAL)、Block Cache 等核心 RocksDB 资源。这大大减少了内存开销和管理成本,而不是为每个 State 都启动一个完整的 DB 实例。
- 原子写入:可以通过
WriteBatch实现跨多个 Column Family 的原子写入,这对于保证 Flink 复杂操作的原子性至关重要。 - 统一快照:可以对整个 RocksDB 实例(包含所有 Column Family)进行一次统一的、物理一致性的快照,极大地简化了 Checkpoint 的实现。
ColumnFamilyDescriptor
ColumnFamilyDescriptor 是 RocksDB Java API 的一部分,它本质上是一个列族(Column Family)的描述符,包含了创建列族所需的名称和配置选项 (ColumnFamilyOptions)。
在 Flink 中,ColumnFamilyDescriptor 的构建主要通过 RocksDBOperationUtils.createColumnFamilyDescriptor 这个静态方法来完成。
我们来看一下这个方法的实现:
RocksDBOperationUtils.java
// ... existing code ...public static ColumnFamilyDescriptor createColumnFamilyDescriptor(RegisteredStateMetaInfoBase metaInfoBase,Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,@Nullable Long writeBufferManagerCapacity) {byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),"The chosen state name 'default' collides with the name of the default column family!");ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);}if (writeBufferManagerCapacity != null) {// It'd be great to perform the check earlier, e.g. when creating write buffer manager.// Unfortunately the check needs write buffer size that was just calculated.sanityCheckArenaBlockSize(options.writeBufferSize(),options.arenaBlockSize(),writeBufferManagerCapacity);}return new ColumnFamilyDescriptor(nameBytes, options);}
// ... existing code ...
从代码中我们可以清晰地看到构建 ColumnFamilyDescriptor 的步骤:
获取列族名称:
- 从传入的
RegisteredStateMetaInfoBase对象中获取 State 的名称 (metaInfoBase.getName())。 - 将这个名称转换为字节数组 (
byte[] nameBytes)。这是因为 RocksDB 的原生 API 使用字节数组来标识列族。 - 这里有一个检查,确保 State 的名称不是 "default",以避免与 RocksDB 的默认列族冲突。
- 从传入的
创建列族配置 (
ColumnFamilyOptions):- 调用
createColumnFamilyOptions方法,这个方法会使用columnFamilyOptionsFactory来生成一个ColumnFamilyOptions实例。 - 这个
columnFamilyOptionsFactory正是我们在EmbeddedRocksDBStateBackend中看到的那个函数:stateName -> resourceContainer.getColumnOptions()。它为每个 State 提供了基础的列族配置。
- 调用
(可选)配置 TTL 压缩过滤器:
- 如果
ttlCompactFiltersManager不为 null,会检查当前 State 是否配置了 TTL(Time-to-Live,生存时间)。 - 如果配置了 TTL,它会为这个列族的
ColumnFamilyOptions设置一个特定的压缩过滤器(Compaction Filter),这个过滤器会在 RocksDB 的后台压缩过程中自动清理过期的数据。
- 如果
(可选)内存检查:
- 如果传入了
writeBufferManagerCapacity,会进行一个健全性检查,确保arenaBlockSize的配置是合理的。
- 如果传入了
实例化
ColumnFamilyDescriptor:- 最后,使用前面准备好的列族名称字节数组和配置好的
ColumnFamilyOptions对象,通过new ColumnFamilyDescriptor(nameBytes, options)来创建一个新的ColumnFamilyDescriptor实例并返回。
- 最后,使用前面准备好的列族名称字节数组和配置好的
综上所述,一个 ColumnFamilyDescriptor 对象主要包含以下两个核心信息:
列族名称 (Column Family Name):
- 以
byte[]数组的形式存储。 - 这个名称直接来源于 Flink State 的
StateDescriptor中定义的名字。例如,new ValueStateDescriptor<>("my-state", String.class)中的"my-state"。
- 以
列族选项 (Column Family Options):
- 一个
org.rocksdb.ColumnFamilyOptions对象。 - 这个对象包含了该列族所有详细的配置参数,例如:
- Merge Operator: 用于处理 ListState、AggregatingState 等需要合并操作的状态。
- Write Buffer Size: 写缓冲区大小。
- Compression Type: 压缩算法(如 Snappy, LZ4)。
- Compaction Filter: 压缩过滤器,如用于实现 TTL 的过滤器。
- 以及其他大量控制 RocksDB 行为的底层参数。
- 一个
这个 ColumnFamilyDescriptor 对象随后会被传递给 db.createColumnFamily() 方法,RocksDB 会根据其中的名称和配置信息,在数据库实例中创建一个新的、隔离的列族。
Namespace 可以自己随意指定类型和值吗?
答案是:是的,几乎可以。
任意的类型 (Type): 在 Flink 的状态接口定义中,命名空间(Namespace)是一个泛型参数
N,例如InternalKvState<K, N, V>。这意味着理论上你可以使用任何 Java/Scala 类型作为Namespace的类型,比如String,Long,Integer,甚至是自定义的 POJO 对象。唯一的硬性要求是:Flink 必须知道如何序列化和反序列化你的 Namespace 类型。你需要为该类型提供一个
TypeSerializer<N>。对于Long,String等基础类型,Flink 会自动推断并使用内置的序列化器。对于自定义的 POJO,你需要确保它符合 Flink 的 POJO 规范,或者手动创建一个TypeSerializer。任意的值 (Value): 一旦确定了
Namespace的类型,你就可以在代码中通过调用state.setCurrentNamespace(namespace)来传入该类型的任意实例作为当前操作的命名空间。这正是你之前实现的
NamespacedStateListView的核心思想:将用户的UID(无论是Long还是String类型)作为namespace的值,从而为每个UID创建了一个逻辑上独立的ListState。AbstractRocksDBState.java文件中的setCurrentNamespace方法就证明了这一点:// ... existing code ... /** The current namespace, which the next value methods will refer to. */ private N currentNamespace; // ... existing code ... @Override public void setCurrentNamespace(N namespace) {this.currentNamespace = namespace; } // ... existing code ...这个方法简单地将传入的
namespace对象赋值给内部的currentNamespace字段,后续的状态操作(如get,add,clear)都会使用这个字段来构建最终存储到 RocksDB 的 key。
和 KeyedStream,Window 的关系
这里需要分开来看:
与 KeyedStream 的关系:强依赖关系
- 必须在
KeyedStream上使用:所有带Namespace的状态(InternalKvState)都属于Keyed State。你必须先通过dataStream.keyBy(...)将数据流转换成KeyedStream,然后才能在下游的算子(如ProcessFunction)中使用这些状态。 - 原因:Flink 的状态是根据
keyBy指定的 Key (K) 来进行分区和管理的。Namespace(N) 只是在某个特定 Key (K) 的状态内部做的进一步划分。可以理解为一种二级索引或子分区。没有keyBy提供的一级分区,Namespace就无从谈起。
与 Window 的关系:没有必然关系,窗口是 Namespace 的一种应用场景
窗口是 Namespace 的使用者,而非前提:可以把窗口(Window)看作是 Flink 框架自身对
Namespace机制的一种自动化应用。当你使用窗口操作时(例如.window(TumblingEventTimeWindows.of(...))),Flink 会自动地:- 为每一个窗口实例(比如
[00:00:05, 00:00:10)这个时间窗口)创建一个TimeWindow对象。 - 在处理属于该窗口的数据时,自动调用
setCurrentNamespace(),并将这个TimeWindow对象作为namespace传入。 这样,窗口内的所有状态计算就被天然地隔离在了这个TimeWindow命名空间下。
- 为每一个窗口实例(比如
可以完全脱离窗口使用 Namespace:在一个普通的
KeyedProcessFunction中,完全没有使用任何窗口操作,而是通过手动调用setCurrentNamespace(uid)来实现了自定义的状态划分。这赋予了超越窗口框架的、更细粒度的状态管理能力。
概念 | 与 Namespace 的关系 | 解释 |
|---|---|---|
KeyedStream | 强依赖 | 必须先 |
Window | 无直接依赖 | 窗口是 |
所以,可以自定义 Namespace 的类型和值,并且这个机制可以独立于 Flink 的窗口(Window)功能来使用,只要你的操作是建立在 KeyedStream 之上即可。
window设置NameSpace
负责调用 setCurrentNamespace 的是窗口算子(WindowOperator)。WindowOperator 的工作流程是:
- 接收到一条数据。
- 调用
WindowAssigner的assignWindows方法,获取这条数据所属的窗口列表。 - 遍历这个窗口列表。
- 对于列表中的每一个窗口,先调用
state.setCurrentNamespace(window)将当前状态的上下文切换到这个窗口。 - 然后,再对该窗口的状态进行更新(比如累加、添加元素等)。
我们可以从工程代码中找到清晰的证据:
在 WindowOperator.java 中,你可以看到这个完整的逻辑:
// ... existing code ...public void processElement(StreamRecord<RowData> record) throws Exception {
// ... existing code ...timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);// 1. 调用 assigner 获取窗口列表// the windows which the input row should be placed intoCollection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);boolean isElementDropped = true;// 2. 遍历窗口列表for (W window : affectedWindows) {isElementDropped = false;// 3. 为每个窗口设置 NamespacewindowState.setCurrentNamespace(window);// 4. 更新状态RowData acc = windowState.value();if (acc == null) {acc = windowAggregator.createAccumulators();}windowAggregator.setAccumulators(window, acc);if (RowDataUtil.isAccumulateMsg(inputRow)) {windowAggregator.accumulate(inputRow);} else {windowAggregator.retract(inputRow);}acc = windowAggregator.getAccumulators();windowState.update(acc);}
// ... existing code ...
另一个例子在处理 Python UDAF 的算子中也可以看到同样的设计模式:
StreamArrowPythonGroupWindowAggregateFunctionOperator.java
// ... existing code ...public void bufferInput(RowData input) throws Exception {
// ... existing code ...// 1. 调用 assigner 获取窗口列表// Given the timestamp and element, returns the set of windows into which it// should be placed.elementWindows = windowAssigner.assignWindows(input, timestamp);// 2. 遍历窗口列表for (W window : elementWindows) {if (RowDataUtil.isAccumulateMsg(input)) {// 3. 为每个窗口设置 NamespacewindowAccumulateData.setCurrentNamespace(window);// 4. 更新状态windowAccumulateData.add(input);} else {windowRetractData.setCurrentNamespace(window);windowRetractData.add(input);}}}
// ... existing code ...