Skip to content

kafka

KafkaReader

KafkaReader(url, metadata, start=None)

Bases: StreamReader

A connection object to read data from Kafka.

Parameters:

Name Type Description Default
url str

URL of Kafka broker to connect to.

required
metadata dict[str, Channel]]

Dictionary of channel metadata for request.

required
start int | None

GPS start time of stream in nanoseconds, defaults to "now".

None
Source code in arrakis/kafka.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    url: str,
    metadata: dict[str, Channel],
    start: int | None = None,
):
    self.url = url
    self.metadata = metadata
    self.start = start

    # track stream -> index -> channel
    self._name_lookup: dict[str, dict[int, Channel]] = defaultdict(defaultdict)
    # filters for partitions
    self._partition_filter_id_sets = defaultdict(set)

    for channel in self.metadata.values():
        partition_id = channel.partition_id
        assert partition_id is not None
        partition_index = channel.partition_index
        assert partition_index is not None
        stream_id = self._id(partition_id)
        self._name_lookup[stream_id][partition_index] = channel
        self._partition_filter_id_sets[partition_id].add(partition_index)

    # pre-compute Arrow arrays for filtering to avoid repeated
    # array creation
    self._partition_filter_id_arrays = {
        partition_id: pyarrow.array(channel_id, type=pyarrow.int32())
        for partition_id, channel_id in self._partition_filter_id_sets.items()
    }

    # Optimize IPC stream reader creation/destruction overhead
    self._ipc_options = pyarrow.ipc.IpcReadOptions(use_threads=False)

    # create Kafka consumer
    consumer_settings = {
        "bootstrap.servers": url,
        "group.id": generate_groupid(),
        "message.max.bytes": 10_000_000,  # 10 MB
        "enable.auto.commit": False,
    }
    self._consumer = Consumer(consumer_settings)
    logger.debug("Kafka consumer: %s", consumer_settings)
    self._topics = [
        f"arrakis-{partition_id}" for partition_id in self._partition_filter_id_sets
    ]
    logger.debug("kafka topics: %s", self._topics)

generate_groupid

generate_groupid()

Generate a random Kafka group ID.

Source code in arrakis/kafka.py
184
185
186
def generate_groupid() -> str:
    """Generate a random Kafka group ID."""
    return random_alphanum(16)

random_alphanum

random_alphanum(n)

Generate a random alpha-numeric sequence of N characters.

Source code in arrakis/kafka.py
189
190
191
192
def random_alphanum(n: int) -> str:
    """Generate a random alpha-numeric sequence of N characters."""
    alphanum = string.ascii_uppercase + string.digits
    return "".join(random.SystemRandom().choice(alphanum) for _ in range(n))