Feature store example (stocks)#
This notebook demonstrates how to generate features and feature-sets, build complex transformations and ingest to offline and real-time data stores, fetch feature vectors for training, save feature vectors for re-use in real-time pipelines, and access features and their statistics in real-time.
Note
By default, this demo works with the online feature store, which is currently not part of the Open Source MLRun default deployment.
In this section
Get started#
Install the latest MLRun package and restart the notebook.
Setting up the environment and project:
import mlrun
mlrun.get_or_create_project("stocks", "./")
> 2024-09-11 12:37:21,186 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=7, patch=0, prerelease='rc40', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=3, prerelease=None, build=None)}
> 2024-09-11 12:37:21,273 [info] Project loaded successfully: {'project_name': 'stocks'}
<mlrun.projects.project.MlrunProject at 0x7f5736f43e80>
Create sample data for demo#
View the demo data#
quotes
trades
stocks
Define, infer and ingest feature sets#
import mlrun.feature_store as fstore
from mlrun.feature_store.steps import *
from mlrun.features import MinMaxValidator
Build and ingest simple feature set (stocks)#
# add feature set without time column (stock ticker metadata)
stocks_set = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")])
fstore.ingest(stocks_set, stocks, infer_options=fstore.InferOptions.default())
| name | exchange | |
|---|---|---|
| ticker | ||
| MSFT | Microsoft Corporation | NASDAQ |
| GOOG | Alphabet Inc | NASDAQ |
| AAPL | Apple Inc | NASDAQ |
Build an advanced feature set - with feature engineering pipeline#
Define a feature set with custom data processing and time aggregation functions:
# create a new feature set
quotes_set = fstore.FeatureSet(
"stock-quotes", entities=[fstore.Entity("ticker")], timestamp_key="time"
)
Define a custom pipeline step (python class)
class MyMap(MapClass):
def __init__(self, multiplier=1, **kwargs):
super().__init__(**kwargs)
self._multiplier = multiplier
def do(self, event):
event["multi"] = event["bid"] * self._multiplier
return event
Build and show the transformation pipeline
Use storey stream processing classes along with library and custom classes:
quotes_set.graph.to("MyMap", multiplier=3).to(
"storey.Extend", _fn="({'extra': event['bid'] * 77})"
).to("storey.Filter", "filter", _fn="(event['bid'] > 51.92)").to(FeaturesetValidator())
quotes_set.add_aggregation("ask", ["sum", "max"], "1h", "10m", name="asks1")
quotes_set.add_aggregation("ask", ["sum", "max"], "5h", "10m", name="asks5")
quotes_set.add_aggregation("bid", ["min", "max"], "1h", "10m", name="bids")
# add feature validation policy
quotes_set["bid"] = fstore.Feature(validator=MinMaxValidator(min=52, severity="info"))
# add default target definitions and plot
quotes_set.set_targets()
quotes_set.plot(rankdir="LR", with_targets=True)
Test and show the pipeline results locally (allow to quickly develop and debug)
quotes_set.preview(
quotes,
entity_columns=["ticker"],
options=fstore.InferOptions.default(),
)
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.95'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.97'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.99'}
| asks1_sum_1h | asks1_max_1h | asks5_sum_5h | asks5_max_5h | bids_min_1h | bids_max_1h | time | bid | ask | multi | extra | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| ticker | |||||||||||
| GOOG | 720.93 | 720.93 | 720.93 | 720.93 | 720.50 | 720.50 | 2024-09-11 12:37:21.246888 | 720.50 | 720.93 | 2161.50 | 55478.50 |
| MSFT | 51.96 | 51.96 | 51.96 | 51.96 | 51.95 | 51.95 | 2024-09-11 12:37:21.246888 | 51.95 | 51.96 | 155.85 | 4000.15 |
| MSFT | 103.94 | 51.98 | 103.94 | 51.98 | 51.95 | 51.97 | 2024-09-11 12:37:21.253888 | 51.97 | 51.98 | 155.91 | 4001.69 |
| MSFT | 155.94 | 52.00 | 155.94 | 52.00 | 51.95 | 51.99 | 2024-09-11 12:37:21.264888 | 51.99 | 52.00 | 155.97 | 4003.23 |
| GOOG | 1441.86 | 720.93 | 1441.86 | 720.93 | 720.50 | 720.50 | 2024-09-11 12:37:21.271888 | 720.50 | 720.93 | 2161.50 | 55478.50 |
| AAPL | 98.01 | 98.01 | 98.01 | 98.01 | 97.99 | 97.99 | 2024-09-11 12:37:21.272888 | 97.99 | 98.01 | 293.97 | 7545.23 |
| GOOG | 2162.74 | 720.93 | 2162.74 | 720.93 | 720.50 | 720.50 | 2024-09-11 12:37:21.295888 | 720.50 | 720.88 | 2161.50 | 55478.50 |
| MSFT | 207.97 | 52.03 | 207.97 | 52.03 | 51.95 | 52.01 | 2024-09-11 12:37:21.298888 | 52.01 | 52.03 | 156.03 | 4004.77 |
# print the feature set object
print(quotes_set.to_yaml())
Ingest data into offline and online stores#
This writes to both targets (Parquet and NoSQL).
# save ingest data and print the FeatureSet spec
df = quotes_set.ingest(quotes)
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.95'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.97'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.99'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.95'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.97'}
info! bid value is smaller than min, key=['MSFT'] args={'min': 52, 'value': '51.99'}
Get an offline feature vector for training#
Example of combining features from 3 sources with time travel join of 3 tables with time travel.
Specify a set of features and request the feature vector offline result as a dataframe:
features = [
"stock-quotes.multi",
"stock-quotes.asks5_sum_5h as total_ask",
"stock-quotes.bids_min_1h",
"stock-quotes.bids_max_1h",
"stocks.*",
]
vector = fstore.FeatureVector(
"stocks-vec", features, description="stocks demo feature vector"
)
vector.save()
resp = vector.get_offline_features(entity_rows=trades, entity_timestamp_column="time")
resp.to_dataframe()
> 2024-09-11 12:37:23,529 [info] Merger detected timestamp resolution incompatibility between feature set stock-quotes and others: datetime64[ns] and datetime64[us]. Converting feature set timestamp column 'time' to type datetime64[ns].
| price | quantity | multi | total_ask | bids_min_1h | bids_max_1h | name | exchange | |
|---|---|---|---|---|---|---|---|---|
| 0 | 51.95 | 75 | 155.97 | 155.94 | 51.95 | 51.99 | Microsoft Corporation | NASDAQ |
| 1 | 51.95 | 155 | 155.97 | 155.94 | 51.95 | 51.99 | Microsoft Corporation | NASDAQ |
| 2 | 720.77 | 100 | 2161.50 | 2162.74 | 720.50 | 720.50 | Alphabet Inc | NASDAQ |
| 3 | 720.92 | 100 | 2161.50 | 2162.74 | 720.50 | 720.50 | Alphabet Inc | NASDAQ |
| 4 | 98.00 | 100 | 293.97 | 98.01 | 97.99 | 97.99 | Apple Inc | NASDAQ |
Initialize an online feature service and use it for real-time inference#
service = vector.get_online_feature_service("")
Request feature vector statistics, can be used for imputing or validation
service.vector.get_stats_table()
| count | mean | min | 25% | 50% | 75% | max | std | hist | unique | top | freq | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| multi | 8.0 | 925.27875 | 155.85 | 155.9550 | 225.000 | 2161.5000 | 2161.50 | 1024.751408 | [[4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... | NaN | NaN | NaN |
| bids_min_1h | 8.0 | 308.41125 | 51.95 | 51.9500 | 74.970 | 720.5000 | 720.50 | 341.596673 | [[4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... | NaN | NaN | NaN |
| bids_max_1h | 8.0 | 308.42625 | 51.95 | 51.9850 | 75.000 | 720.5000 | 720.50 | 341.583803 | [[4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... | NaN | NaN | NaN |
| name | 3.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | 3.0 | Microsoft Corporation | 1.0 |
| exchange | 3.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | 1.0 | NASDAQ | 3.0 |
| total_ask | 8.0 | 617.91875 | 51.96 | 102.4575 | 181.955 | 901.1625 | 2162.74 | 784.877980 | [[4, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0,... | NaN | NaN | NaN |
Real-time feature vector request
service.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}])
[{'bids_min_1h': 720.5,
'bids_max_1h': 720.5,
'total_ask': 2162.74,
'multi': 2161.5,
'name': 'Alphabet Inc',
'exchange': 'NASDAQ'},
{'bids_min_1h': 51.95,
'bids_max_1h': 52.01,
'total_ask': 207.97,
'multi': 156.03,
'name': 'Microsoft Corporation',
'exchange': 'NASDAQ'}]
service.get([{"ticker": "AAPL"}])
[{'bids_min_1h': 97.99,
'bids_max_1h': 97.99,
'total_ask': 98.01,
'multi': 293.97,
'name': 'Apple Inc',
'exchange': 'NASDAQ'}]
service.close()