Kadir Ozdemir created PHOENIX-7001:
--------------------------------------
Summary: Change Data Capture leveraging Max Lookback and Uncovered
Indexes
Key: PHOENIX-7001
URL: https://issues.apache.org/jira/browse/PHOENIX-7001
Project: Phoenix
Issue Type: Improvement
Reporter: Kadir Ozdemir
The use cases for a Change Data Capture (CDC) feature are centered around
capturing changes to a given table (or updatable view) as these changes happen
in near real-time. A CDC application can retrieve changes in real-time or with
some delay, or even retrieves the same set of changes multiple times. This
means the CDC use case can be generalized as time range queries where the time
range is typically short such as last x minutes or hours or expressed as a
specific time range in the last n days where n is typically less than 7.
A change is an update in a row. That is, a change is either updating one or
more columns of a table for a given row or deleting a row. It is desirable to
provide these changes in the order of their arrival. One can visualize the
delivery of these changes through a stream from a Phoenix table to the
application that is initiated by the application similar to the delivery of any
other Phoenix query results. The difference is that a regular query result
includes at most one result row for each row satisfying the query and the
deleted rows are not visible to the query result while the CDC stream/result
can include multiple result rows for each row and the result includes deleted
rows. Some use cases need to also get the pre and/or post image of the row
along with a change on the row.
The design proposed here leverages Phoenix Max Lookback and Uncovered (Global
or Local) Indexes. The max lookback feature retains recent changes to a table,
that is, the changes that have been done in the last x days typically. This
means that the max lookback feature already captures the changes to a given
table. Currently, the max lookback age is configurable at the cluster level. We
need to extend this capability to be able to configure the max lookback age at
the table level so that each table can have a different max lookback age based
on its CDC application requirements.
To deliver the changes in the order of their arrival, we need a time based
index. This index should be uncovered as the changes are already retained in
the table by the max lookback feature. The arrival time can be defined as the
mutation timestamp generated by the server, or a user-specified timestamp (or
any other long integer) column. An uncovered index would allow us to
efficiently and orderly access to the changes. Changes to an index table are
also preserved by the max lookback feature.
A CDC feature can be composed of the following components:
* {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It
goes through index table rows using a raw scan to identify data table rows and
retrieves these rows using a raw scan. Using the time range, it forms a JSON
blob to represent changes to the row including pre and/or post row images.
* {*}CDC Query Compiler{*}: This is a client side component. It prepares the
scan object based on the given CDC query statement.
* {*}CDC DDL Compiler{*}: This is a client side component. It creates the time
based uncovered (global/local) index based on the given CDC DDL statement and a
virtual table of CDC type. CDC will be a new table type.
A CDC DDL syntax to create CDC on a (data) table can be as follows:
Create CDC <CDC Table Name> on <Data Table Name> (PHOENIX_ROW_TIMESTAMP() |
<Data Table Column>) INCLUDE (pre | post | latest | all) TTL = <Age in seconds>
INDEX = <GLOBAL | LOCAL> SALT_BUCKETS=<salt bucket count>
The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC
table PK columns start with the timestamp or user defined column and continue
with the data table PK columns. The CDC table includes one non-PK column which
is a JSON column. The change is expressed in this JSON column in multiple ways
based on the CDC DDL or query statement. The change can be expressed as just
the mutation for the change, the latest image of the row, the pre image of the
row (the image before the change), the post image, or any combination of these.
The CDC table is not a physical table on disk. It is just a virtual table to be
used in a CDC query. Phoenix stores just the metadata for this virtual table.
A CDC query can be as follow:
Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)
This query would return the rows of the CDC table which is constructed on the
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index
row versions with the corresponding data table row version (using raw scans).
The above select query can be hinted at by using a new CDC hint to return just
the actual change, pre, pos, or latest image of the row, or a combination of
them to overwrite the default JSON column format defined by the CDC DDL
statement.
The CDC application will run the above query in a loop. When the difference
between the current time of the application and the upper limit of the time
range of the query becomes less than s milliseconds, say x milliseconds, then
the application thread needs to sleep s - x milliseconds. The value for s can
be small such as 1000 milliseconds. This is to make sure that time skew among
the server wall clocks does not lead to data loss.
A global time based index may create hot spotting during writes. This is
because the same region of the global index will keep getting updated. Since
the global index would be uncovered, the size of the updates will be usually
smaller than the data table updates. If we assume that index mutations are n
times smaller than data table mutations, then a single index region will be
able to sustain writes from n data table regions if the data table does not
have any other indexes. When the data table has other indexes, the data table
write can slow down by 3 times or so. This allows a single index region to
match with 3n data table regions. If the number of active data table regions is
more than a single index region can sustain then we need to distribute the load
to multiple index regions using salting.
A local time based index does not have the hot spotting issue but can result in
slower CDC queries for tables with a large number of regions. That is why this
proposal suggests using global indexes by default.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)