2

I have a requirement where we need to collect N different events and store them for analysis. I am having trouble coming up with a general architecture for this.

FINAL REQUIREMENTS

The end goal of the system is to store the raw events as they appear in the Kafka Queue for downstream processing.

Let's say I have three different types of events coming into my system [ eventA , eventB, eventC ]. Each of these events has it's own unique schema.

The end goal is to store something like this in Azure Data Lake Gen 2 so that it can be processed later

For 2022-02-01 : /2022-02-01/eventA/<rawDataHere.parquet> , /2022-02-01/eventB/<rawDataHere.parquet> , /2022-02-01/eventC/<rawDataHere.parquet>

For 2022-02-02: /2022-02-02/eventA/<rawDataHere.parquet> , /2022-02-02/eventB/<rawDataHere.parquet> , /2022-02-02/eventC/<rawDataHere.parquet>

For 2022-02-03: /2022-02-03/eventA/<rawDataHere.parquet> , /2022-02-03/eventB/<rawDataHere.parquet> , /2022-02-03/eventC/<rawDataHere.parquet>

DETAILED REQUIREMENTS

  • At any point in time, there can be N different types of events. Think of a type of event as a specific stream of events [ For eg, userClick is a type of event and userPageOpen is a different type of event. These events are independent of each other. ]
  • For each event type, the structure is guaranteed to be consistent [ For eg, for event type userClick, it'll always have the same properties with the same schema ], but the Schema for different event types will always be different.
  • The minimum required partitioning for each event is date.
  • There's no requirement to read multiple events at any point of time. During analysis, the user will only read one type of event.
  • The end goal is to store EventWise, DateWise parquet files [ We're also considering DeltaLake by DataBricks ] for downstream analysis which is done on a Ad-Hoc fashion.

WHAT WE'RE PLANNING TO DO

  • KAFKA
    • Given the high ingest rate [ 25000 events / second ], it's important to stream the data through Kafka.
    • Question here would be, should we use a separate topic for each event type or stream them into a single topic with a structure like {"eventType": "userClick", "eventData: {}}
    • Both of these approaches seem problematic on the consumer side [ Explained in the next section ]
  • CONSUMER [ SPARK ]
    • Again, spark seems a decent choice here since we'd have a high rate of ingestion which will only go up as time goes on.
    • If we have a single topic, we should be able to group by eventType, but this seems to corrupt the schema inferred by spark. Even if we can store the schema for each event in a metastore, I'm not sure how we can group by event, make each event into a separate dataframe and pass in the schema. Even once this is done, there'll have to be N writeStreams [ One df.writeStream per event ] which doesn't seem like a wise thing to do with streaming.
    • If we have multiple topics, we'll need to have N different jobs [ 1 for each eventType ] where we'll manually need to add a new job with a schedule everytime an event is added. This also seems expensive as we'll need N different clusters for processing.
    • In this approach, we'll need to make sure we maximise file size [ 1GB minimum ] while minimising the number of files as storing increasing number of small files negatively impact query performance when processing later.
  • CONUSMER [ NON-SPARK ]
    • Given the issue in spark, what we could potentially do is write a raw consumer in Java, and keep collecting data for 15 min interval. Once this is done, we can flush out the data to a raw file [ after grouping by eventType ] and write it to Azure Data Lake. We could have a separate job running every day which reads all the data from the previous day, compact and write it into parquet files.
    • This approach seems doable, but I'd like to have a simple architecture if possible. Every extra step in processing always comes with additional overhead and complexity and I'd like to figure out the simplest way to achieve my target.
    • In this approach, we'll need to make sure we maximise file size [ 1GB minimum ] while minimising the number of files as storing increasing number of small files negatively impact query performance when processing later.

I am very new to the world of Data, so please forgive any potential silly mistake. Any help would be appreciated.

Thanks in advance :D

Sriram R
  • 29
  • 3

1 Answers1

1

I've read the post at least 3 times, it is still not entirely clear what you want to do. I do have a couple of points though.

First, if you have Kafka, you'll likely don't need Spark at all, unless you really want to use some niche feature of Spark. Kafka is not just a queue system, it is a full fledged stream processing system (Kafka Streams).

You also don't need more clusters if you have more topics. You'll need one cluster, generally at least 3 nodes, if you want the system to work with 1 node down.

As far as I understand, you want to aggregate events into files partitioned based on date. You can easily achieve all this with Kafka and some small custom Kafka Clients. For example you can ask Kafka to always send the same event type to the same client (just have the type as message key). This client then can write the file(s) based on the types it receives. The more clients you launch the less number of files each client will be responsible for. This works best if all event types have similar size and amount. You'll need some solution for handling nodes going offline and online. Kafka will tell you when these things happen.

If each event would use its own topic, that would make keeping the progress somewhat easier to manage. Note however, that topics are not dynamic. They usually have to be defined statically and should be a limited number.

You could also build a Kafka Streams client to directly aggregate messages into one big message, which you just have to then write into a file 1-to-1. This would out-of-the-box give you all sorts of things, like guaranteeing that you won't lose anything when clients come online and go offline. Automatically keep track of progress. Guarantee once-only semantics. And so on. If your aggregate is 1GB though, this would probably be too big a message for Kafka.

Or you could potentially come up with a mix of both of the above. It is hard to say without knowing more.

I hope it gives you some pointers though.

Robert Bräutigam
  • 11,473
  • 1
  • 17
  • 36
  • Hey @Robert. The end goal of the system is to store the raw data for further ad-hoc analysis. The issue with using eventType as key is that there could be some event types which has a very low number of events per day [ 1M - 5M ] while others with a very high number of events [ The largest event has about 1B events / day ]. This makes it hard to send in events to kafka partitioning by key. I have updated the question to include more details, hope that helps clarify the question. – Sriram R Feb 27 '22 at 03:00
  • Let's say if I poll Kafka every 15 mins and flush the data into a file, there's a good possibility of many small files being created which isn't ideal for analytical workloads. For this I'd have to run a separate service which will compact these small files into larger files. Although that's doable, it introduces one more hop in the system and I'm trying to simplify it as much as possible. – Sriram R Feb 27 '22 at 03:02
  • To be clear, I want to store raw data into a storage like S3 or Azure Blob Storage, there's no aggregation requirements here. Any aggregations will be done after the raw data is stored onto a Data Lake. – Sriram R Feb 27 '22 at 03:26
  • @SriramR Have you considered resetting the consumer index and re-trying at next interval in cases where the file size is too small (and writing it anyway if it's for a full day)? – Jason Weber Mar 05 '22 at 02:50
  • @SriramR This seems like an aggregation task--the goal being to aggregate individual messages into a file. The example shows the messages ordered by time, which if required is also relevant. – Jason Weber Mar 05 '22 at 02:55
  • @JasonWeber agreed it's an aggregation task. My issue is in handling N different types of events. N events will have to be stored separately. Here comes the complexity. – Sriram R Mar 20 '22 at 04:45