After #965 lands, we will have deprecated the TopicProducer::send_record
API, which allowed users to send a record without specifying a key for it. This API was deprecated because send_record accepted a partition number as one of the arguments, and we decided that for now, users should not be able to manually specify partitions.
This leaves a gap in our API for a particular use-case, which is sending a single record with no key. Without send_record
, the only way to send a record with no key involves using the send_all
function, which forces users to wrap their single record in some kind of iterator before passing it to the producer, e.g. like this:
producer.send_all(Some( (None, "My record data with no key") )).await?;
I believe that this is a workaround that we should not force users to deal with, as it is non-obvious and non-ergonomic. I think we need to introduce a new API method for the "single record with no key" use-case. I originally proposed a send_keyless
API in #965 (discussion at https://github.com/infinyon/fluvio/pull/965#discussion_r615967714), but we decided to exclude it from that PR in order to discuss it more, which is what this issue is for.
For reference, the send_keyless
API that I would like to introduce looks like this:
impl TopicProducer {
pub async fn send_keyless<V>(&self, value: V) -> Result<(), FluvioError>
where
V: Into<Vec<u8>> { ... }
}
So the questions I would like to resolve in this discussion are:
- Do we believe that a new API to fill this use-case is necessary? (I believe it is),
- Is there a different API that better serves this use-case (I have not thought of one)?
- Are we happy with the name
send_keyless
or would we prefer something else?
I think it makes sense to put a timeline on this. If nobody has any strong reasons why we should NOT introduce this API, I would like to add this issue to our roadmap as an action item by this Wednesday (4/21/21) to give enough time to ship it with the 0.7.4
release this Thursday.
Discussion Results
After debating the API, we've decided to implement the following:
pub struct RecordKey(RecordKeyInner);
impl RecordKey {
pub const NULL: Self = Self(RecordKeyInner::Null);
}
enum RecordKeyInner {
Null,
Key(RecordData),
}
impl<K: Into<Vec<u8>>> From<K> for RecordKey {
fn from(k: K) -> Self {
Self(RecordKeyInner::Key(RecordData::from(k)))
}
}
/// A type to hold the contents of a record's value.
pub struct RecordData(bytes::Bytes);
impl<V: Into<Vec<u8>>> From<V> for RecordData {
fn from(v: V) -> Self {
let value: Vec<u8> = v.into();
Self(value.into())
}
}
fn send<K, V>(key: K, value: V)
where
K: Into<RecordKey>,
V: Into<RecordData>,
{ ... }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_conversion() {
send(RecordKey::NULL, vec![2]);
send(vec![3],vec![4]);
send("Hello", "world");
send("Hello".to_string(), "World".to_string());
}
}
Client API producer