Follow these steps to create a Faust app that will read (and write) data to a Kafka server and do some simple processing:
- First, we need to create a Faust App instance that will act as the interface between Python and the Kafka server:
app = faust.App("sample", broker="kafka://localhost")
- Next, we will create a record type that mimics the data we expect from the server:
class Record(faust.Record):
id_string: str
value: float
- Now, we'll add a topic to the Faust App object that sets the value type to the Record class that we just defined:
topic = app.topic("sample-topic", value_type=Record)
- Now, we define an agent, which is an asynchronous function wrapped in the agent decorator on the App object:
@app.agent(topic)
async def process_record(records):
async for record in records:
...