Package: python-kafka Version: 1.3.3-3 Severity: normal Tags: patch User: ubuntu-de...@lists.ubuntu.com Usertags: origin-ubuntu cosmic ubuntu-patch
Dear Maintainer, In Ubuntu, the attached patch was applied to achieve the following: * d/p/py37-compat.patch: Python 3.7 compatibility fixes. The patch was picked from the upstream VCS. Thanks for considering the patch. -- System Information: Debian Release: buster/sid APT prefers cosmic APT policy: (500, 'cosmic') Architecture: amd64 (x86_64) Foreign Architectures: i386 Kernel: Linux 4.15.0-23-generic (SMP w/4 CPU cores) Locale: LANG=en_GB.UTF-8, LC_CTYPE=en_GB.UTF-8 (charmap=UTF-8), LANGUAGE=en_GB:en (charmap=UTF-8) Shell: /bin/sh linked to /bin/dash Init: systemd (via /run/systemd/system) LSM: AppArmor: enabled
diff -Nru python-kafka-1.3.3/debian/patches/1 python-kafka-1.3.3/debian/patches/1 --- python-kafka-1.3.3/debian/patches/1 1969-12-31 18:00:00.000000000 -0600 +++ python-kafka-1.3.3/debian/patches/1 2018-07-05 05:26:10.000000000 -0500 @@ -0,0 +1,318 @@ +From b62006aeb86258b4b1ef2735bebb1fe99459b82d Mon Sep 17 00:00:00 2001 +From: Dana Powers <dana.pow...@gmail.com> +Date: Fri, 23 Mar 2018 05:58:55 -0700 +Subject: [PATCH] Change SimpleProducer to use async_send (async is reserved in + py37) (#1454) + +--- + docs/simple.rst | 8 ++++---- + kafka/producer/base.py | 38 +++++++++++++++++++++++--------------- + kafka/producer/keyed.py | 2 +- + kafka/producer/simple.py | 2 +- + test/test_failover_integration.py | 8 ++++---- + test/test_producer_integration.py | 8 ++++---- + test/test_producer_legacy.py | 10 +++++----- + 7 files changed, 42 insertions(+), 34 deletions(-) + +diff --git a/docs/simple.rst b/docs/simple.rst +index 8192a8b7..afdb9756 100644 +--- a/docs/simple.rst ++++ b/docs/simple.rst +@@ -49,7 +49,7 @@ Asynchronous Mode + + # To send messages asynchronously + client = SimpleClient('localhost:9092') +- producer = SimpleProducer(client, async=True) ++ producer = SimpleProducer(client, async_send=True) + producer.send_messages('my-topic', b'async message') + + # To send messages in batch. You can use any of the available +@@ -60,7 +60,7 @@ Asynchronous Mode + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(client, +- async=True, ++ async_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + +@@ -73,7 +73,7 @@ Synchronous Mode + + # To send messages synchronously + client = SimpleClient('localhost:9092') +- producer = SimpleProducer(client, async=False) ++ producer = SimpleProducer(client, async_send=False) + + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages('my-topic', b'some message') +@@ -88,7 +88,7 @@ Synchronous Mode + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(client, +- async=False, ++ async_send=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000, + sync_fail_on_error=False) +diff --git a/kafka/producer/base.py b/kafka/producer/base.py +index c038bd3a..e8d6c3d2 100644 +--- a/kafka/producer/base.py ++++ b/kafka/producer/base.py +@@ -226,7 +226,7 @@ class Producer(object): + + Arguments: + client (kafka.SimpleClient): instance to use for broker +- communications. If async=True, the background thread will use ++ communications. If async_send=True, the background thread will use + :meth:`client.copy`, which is expected to return a thread-safe + object. + codec (kafka.protocol.ALL_CODECS): compression codec to use. +@@ -238,11 +238,11 @@ class Producer(object): + sync_fail_on_error (bool, optional): whether sync producer should + raise exceptions (True), or just return errors (False), + defaults to True. +- async (bool, optional): send message using a background thread, ++ async_send (bool, optional): send message using a background thread, + defaults to False. +- batch_send_every_n (int, optional): If async is True, messages are ++ batch_send_every_n (int, optional): If async_send is True, messages are + sent in batches of this size, defaults to 20. +- batch_send_every_t (int or float, optional): If async is True, ++ batch_send_every_t (int or float, optional): If async_send is True, + messages are sent immediately after this timeout in seconds, even + if there are fewer than batch_send_every_n, defaults to 20. + async_retry_limit (int, optional): number of retries for failed messages +@@ -268,8 +268,10 @@ class Producer(object): + defaults to 30. + + Deprecated Arguments: ++ async (bool, optional): send message using a background thread, ++ defaults to False. Deprecated, use 'async_send' + batch_send (bool, optional): If True, messages are sent by a background +- thread in batches, defaults to False. Deprecated, use 'async' ++ thread in batches, defaults to False. Deprecated, use 'async_send' + """ + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log +@@ -282,8 +284,8 @@ def __init__(self, client, + codec=None, + codec_compresslevel=None, + sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, +- async=False, +- batch_send=False, # deprecated, use async ++ async_send=False, ++ batch_send=False, # deprecated, use async_send + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, +@@ -292,15 +294,21 @@ def __init__(self, client, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, +- async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): ++ async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS, ++ **kwargs): ++ ++ # async renamed async_send for python3.7 support ++ if 'async' in kwargs: ++ log.warning('Deprecated async option found -- use async_send') ++ async_send = kwargs['async'] + +- if async: ++ if async_send: + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 + + self.client = client +- self.async = async ++ self.async_send = async_send + self.req_acks = req_acks + self.ack_timeout = ack_timeout + self.stopped = False +@@ -313,7 +321,7 @@ def __init__(self, client, + self.codec = codec + self.codec_compresslevel = codec_compresslevel + +- if self.async: ++ if self.async_send: + # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout +@@ -400,7 +408,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): + if key is not None and not isinstance(key, six.binary_type): + raise TypeError("the key must be type bytes") + +- if self.async: ++ if self.async_send: + for idx, m in enumerate(msg): + try: + item = (TopicPartition(topic, partition), m, key) +@@ -435,7 +443,7 @@ def stop(self, timeout=None): + log.warning('timeout argument to stop() is deprecated - ' + 'it will be removed in future release') + +- if not self.async: ++ if not self.async_send: + log.warning('producer.stop() called, but producer is not async') + return + +@@ -443,7 +451,7 @@ def stop(self, timeout=None): + log.warning('producer.stop() called, but producer is already stopped') + return + +- if self.async: ++ if self.async_send: + self.queue.put((STOP_ASYNC_PRODUCER, None, None)) + self.thread_stop_event.set() + self.thread.join() +@@ -471,5 +479,5 @@ def stop(self, timeout=None): + self.stopped = True + + def __del__(self): +- if self.async and not self.stopped: ++ if self.async_send and not self.stopped: + self.stop() +diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py +index 8de3ad80..62bb733f 100644 +--- a/kafka/producer/keyed.py ++++ b/kafka/producer/keyed.py +@@ -46,4 +46,4 @@ def send(self, topic, key, msg): + return self.send_messages(topic, key, msg) + + def __repr__(self): +- return '<KeyedProducer batch=%s>' % self.async ++ return '<KeyedProducer batch=%s>' % self.async_send +diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py +index 589363c9..91e0abc4 100644 +--- a/kafka/producer/simple.py ++++ b/kafka/producer/simple.py +@@ -51,4 +51,4 @@ def send_messages(self, topic, *msg): + ) + + def __repr__(self): +- return '<SimpleProducer batch=%s>' % self.async ++ return '<SimpleProducer batch=%s>' % self.async_send +diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py +index 8531cfbe..797e1c8e 100644 +--- a/test/test_failover_integration.py ++++ b/test/test_failover_integration.py +@@ -60,7 +60,7 @@ def test_switch_leader(self): + # require that the server commit messages to all in-sync replicas + # so that failover doesn't lose any messages on server-side + # and we can assert that server-side message count equals client-side +- producer = Producer(self.client, async=False, ++ producer = Producer(self.client, async_send=False, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) + + # Send 100 random messages to a specific partition +@@ -101,7 +101,7 @@ def test_switch_leader_async(self): + partition = 0 + + # Test the base class Producer -- send_messages to a specific partition +- producer = Producer(self.client, async=True, ++ producer = Producer(self.client, async_send=True, + batch_send_every_n=15, + batch_send_every_t=3, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, +@@ -146,7 +146,7 @@ def test_switch_leader_async(self): + def test_switch_leader_keyed_producer(self): + topic = self.topic + +- producer = KeyedProducer(self.client, async=False) ++ producer = KeyedProducer(self.client, async_send=False) + + # Send 10 random messages + for _ in range(10): +@@ -182,7 +182,7 @@ def test_switch_leader_keyed_producer(self): + producer.send_messages(topic, key, msg) + + def test_switch_leader_simple_consumer(self): +- producer = Producer(self.client, async=False) ++ producer = Producer(self.client, async_send=False) + consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) + self._send_random_messages(producer, self.topic, 0, 2) + consumer.get_messages() +diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py +index 6cd3d13a..2b810476 100644 +--- a/test/test_producer_integration.py ++++ b/test/test_producer_integration.py +@@ -216,7 +216,7 @@ def test_async_simple_producer(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + +- producer = SimpleProducer(self.client, async=True, random_start=False) ++ producer = SimpleProducer(self.client, async_send=True, random_start=False) + resp = producer.send_messages(self.topic, self.msg("one")) + self.assertEqual(len(resp), 0) + +@@ -235,7 +235,7 @@ def test_batched_simple_producer__triggers_by_message(self): + batch_interval = 5 + producer = SimpleProducer( + self.client, +- async=True, ++ async_send=True, + batch_send_every_n=batch_messages, + batch_send_every_t=batch_interval, + random_start=False) +@@ -300,7 +300,7 @@ def test_batched_simple_producer__triggers_by_time(self): + batch_interval = 5 + producer = SimpleProducer( + self.client, +- async=True, ++ async_send=True, + batch_send_every_n=100, + batch_send_every_t=batch_interval, + random_start=False) +@@ -432,7 +432,7 @@ def test_async_keyed_producer(self): + + producer = KeyedProducer(self.client, + partitioner=RoundRobinPartitioner, +- async=True, ++ async_send=True, + batch_send_every_t=1) + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) +diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py +index 9b87c766..6d00116c 100644 +--- a/test/test_producer_legacy.py ++++ b/test/test_producer_legacy.py +@@ -73,7 +73,7 @@ def partitions(topic): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 +- producer = Producer(MagicMock(), async=True, ++ producer = Producer(MagicMock(), async_send=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' +@@ -95,25 +95,25 @@ def test_producer_sync_fail_on_error(self): + with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): + + client = SimpleClient(MagicMock()) +- producer = SimpleProducer(client, async=False, sync_fail_on_error=False) ++ producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + +- producer = SimpleProducer(client, async=False, sync_fail_on_error=True) ++ producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + + def test_cleanup_is_not_called_on_stopped_producer(self): +- producer = Producer(MagicMock(), async=True) ++ producer = Producer(MagicMock(), async_send=True) + producer.stopped = True + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) + self.assertEqual(mocked_stop.call_count, 0) + + def test_cleanup_is_called_on_running_producer(self): +- producer = Producer(MagicMock(), async=True) ++ producer = Producer(MagicMock(), async_send=True) + producer.stopped = False + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) diff -Nru python-kafka-1.3.3/debian/patches/py37-compat.patch python-kafka-1.3.3/debian/patches/py37-compat.patch --- python-kafka-1.3.3/debian/patches/py37-compat.patch 1969-12-31 18:00:00.000000000 -0600 +++ python-kafka-1.3.3/debian/patches/py37-compat.patch 2018-07-05 05:26:23.000000000 -0500 @@ -0,0 +1,304 @@ +From b62006aeb86258b4b1ef2735bebb1fe99459b82d Mon Sep 17 00:00:00 2001 +From: Dana Powers <dana.pow...@gmail.com> +Date: Fri, 23 Mar 2018 05:58:55 -0700 +Subject: [PATCH] Change SimpleProducer to use async_send (async is reserved in + py37) (#1454) + +--- + docs/simple.rst | 8 ++++---- + kafka/producer/base.py | 38 +++++++++++++++++++++++--------------- + kafka/producer/keyed.py | 2 +- + kafka/producer/simple.py | 2 +- + test/test_failover_integration.py | 8 ++++---- + test/test_producer_integration.py | 8 ++++---- + test/test_producer_legacy.py | 10 +++++----- + 7 files changed, 42 insertions(+), 34 deletions(-) + +--- a/docs/simple.rst ++++ b/docs/simple.rst +@@ -49,7 +49,7 @@ Asynchronous Mode + + # To send messages asynchronously + client = SimpleClient('localhost:9092') +- producer = SimpleProducer(client, async=True) ++ producer = SimpleProducer(client, async_send=True) + producer.send_messages('my-topic', b'async message') + + # To send messages in batch. You can use any of the available +@@ -60,7 +60,7 @@ Asynchronous Mode + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(client, +- async=True, ++ async_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + +@@ -73,7 +73,7 @@ Synchronous Mode + + # To send messages synchronously + client = SimpleClient('localhost:9092') +- producer = SimpleProducer(client, async=False) ++ producer = SimpleProducer(client, async_send=False) + + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages('my-topic', b'some message') +@@ -88,7 +88,7 @@ Synchronous Mode + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(client, +- async=False, ++ async_send=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000, + sync_fail_on_error=False) +--- a/kafka/producer/base.py ++++ b/kafka/producer/base.py +@@ -227,7 +227,7 @@ class Producer(object): + + Arguments: + client (kafka.SimpleClient): instance to use for broker +- communications. If async=True, the background thread will use ++ communications. If async_send=True, the background thread will use + :meth:`client.copy`, which is expected to return a thread-safe + object. + codec (kafka.protocol.ALL_CODECS): compression codec to use. +@@ -239,11 +239,11 @@ class Producer(object): + sync_fail_on_error (bool, optional): whether sync producer should + raise exceptions (True), or just return errors (False), + defaults to True. +- async (bool, optional): send message using a background thread, ++ async_send (bool, optional): send message using a background thread, + defaults to False. +- batch_send_every_n (int, optional): If async is True, messages are ++ batch_send_every_n (int, optional): If async_send is True, messages are + sent in batches of this size, defaults to 20. +- batch_send_every_t (int or float, optional): If async is True, ++ batch_send_every_t (int or float, optional): If async_send is True, + messages are sent immediately after this timeout in seconds, even + if there are fewer than batch_send_every_n, defaults to 20. + async_retry_limit (int, optional): number of retries for failed messages +@@ -269,8 +269,10 @@ class Producer(object): + defaults to 30. + + Deprecated Arguments: ++ async (bool, optional): send message using a background thread, ++ defaults to False. Deprecated, use 'async_send' + batch_send (bool, optional): If True, messages are sent by a background +- thread in batches, defaults to False. Deprecated, use 'async' ++ thread in batches, defaults to False. Deprecated, use 'async_send' + """ + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log +@@ -283,8 +285,8 @@ class Producer(object): + codec=None, + codec_compresslevel=None, + sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, +- async=False, +- batch_send=False, # deprecated, use async ++ async_send=False, ++ batch_send=False, # deprecated, use async_send + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, +@@ -293,15 +295,21 @@ class Producer(object): + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, +- async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): ++ async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS, ++ **kwargs): ++ ++ # async renamed async_send for python3.7 support ++ if 'async' in kwargs: ++ log.warning('Deprecated async option found -- use async_send') ++ async_send = kwargs['async'] + +- if async: ++ if async_send: + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 + + self.client = client +- self.async = async ++ self.async_send = async_send + self.req_acks = req_acks + self.ack_timeout = ack_timeout + self.stopped = False +@@ -314,7 +322,7 @@ class Producer(object): + self.codec = codec + self.codec_compresslevel = codec_compresslevel + +- if self.async: ++ if self.async_send: + # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout +@@ -401,7 +409,7 @@ class Producer(object): + if key is not None and not isinstance(key, six.binary_type): + raise TypeError("the key must be type bytes") + +- if self.async: ++ if self.async_send: + for idx, m in enumerate(msg): + try: + item = (TopicPartition(topic, partition), m, key) +@@ -436,7 +444,7 @@ class Producer(object): + log.warning('timeout argument to stop() is deprecated - ' + 'it will be removed in future release') + +- if not self.async: ++ if not self.async_send: + log.warning('producer.stop() called, but producer is not async') + return + +@@ -444,7 +452,7 @@ class Producer(object): + log.warning('producer.stop() called, but producer is already stopped') + return + +- if self.async: ++ if self.async_send: + self.queue.put((STOP_ASYNC_PRODUCER, None, None)) + self.thread_stop_event.set() + self.thread.join() +@@ -472,5 +480,5 @@ class Producer(object): + self.stopped = True + + def __del__(self): +- if self.async and not self.stopped: ++ if self.async_send and not self.stopped: + self.stop() +--- a/kafka/producer/keyed.py ++++ b/kafka/producer/keyed.py +@@ -46,4 +46,4 @@ class KeyedProducer(Producer): + return self.send_messages(topic, key, msg) + + def __repr__(self): +- return '<KeyedProducer batch=%s>' % self.async ++ return '<KeyedProducer batch=%s>' % self.async_send +--- a/kafka/producer/simple.py ++++ b/kafka/producer/simple.py +@@ -51,4 +51,4 @@ class SimpleProducer(Producer): + ) + + def __repr__(self): +- return '<SimpleProducer batch=%s>' % self.async ++ return '<SimpleProducer batch=%s>' % self.async_send +--- a/test/test_failover_integration.py ++++ b/test/test_failover_integration.py +@@ -61,7 +61,7 @@ class TestFailover(KafkaIntegrationTestC + # require that the server commit messages to all in-sync replicas + # so that failover doesn't lose any messages on server-side + # and we can assert that server-side message count equals client-side +- producer = Producer(self.client, async=False, ++ producer = Producer(self.client, async_send=False, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) + + # Send 100 random messages to a specific partition +@@ -102,7 +102,7 @@ class TestFailover(KafkaIntegrationTestC + partition = 0 + + # Test the base class Producer -- send_messages to a specific partition +- producer = Producer(self.client, async=True, ++ producer = Producer(self.client, async_send=True, + batch_send_every_n=15, + batch_send_every_t=3, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, +@@ -147,7 +147,7 @@ class TestFailover(KafkaIntegrationTestC + def test_switch_leader_keyed_producer(self): + topic = self.topic + +- producer = KeyedProducer(self.client, async=False) ++ producer = KeyedProducer(self.client, async_send=False) + + # Send 10 random messages + for _ in range(10): +@@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestC + producer.send_messages(topic, key, msg) + + def test_switch_leader_simple_consumer(self): +- producer = Producer(self.client, async=False) ++ producer = Producer(self.client, async_send=False) + consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) + self._send_random_messages(producer, self.topic, 0, 2) + consumer.get_messages() +--- a/test/test_producer_integration.py ++++ b/test/test_producer_integration.py +@@ -184,7 +184,7 @@ class TestKafkaProducerIntegration(Kafka + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + +- producer = SimpleProducer(self.client, async=True, random_start=False) ++ producer = SimpleProducer(self.client, async_send=True, random_start=False) + resp = producer.send_messages(self.topic, self.msg("one")) + self.assertEqual(len(resp), 0) + +@@ -203,7 +203,7 @@ class TestKafkaProducerIntegration(Kafka + batch_interval = 5 + producer = SimpleProducer( + self.client, +- async=True, ++ async_send=True, + batch_send_every_n=batch_messages, + batch_send_every_t=batch_interval, + random_start=False) +@@ -268,7 +268,7 @@ class TestKafkaProducerIntegration(Kafka + batch_interval = 5 + producer = SimpleProducer( + self.client, +- async=True, ++ async_send=True, + batch_send_every_n=100, + batch_send_every_t=batch_interval, + random_start=False) +@@ -400,7 +400,7 @@ class TestKafkaProducerIntegration(Kafka + + producer = KeyedProducer(self.client, + partitioner=RoundRobinPartitioner, +- async=True, ++ async_send=True, + batch_send_every_t=1) + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) +--- a/test/test_producer_legacy.py ++++ b/test/test_producer_legacy.py +@@ -73,7 +73,7 @@ class TestKafkaProducer(unittest.TestCas + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 +- producer = Producer(MagicMock(), async=True, ++ producer = Producer(MagicMock(), async_send=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' +@@ -95,25 +95,25 @@ class TestKafkaProducer(unittest.TestCas + with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): + + client = SimpleClient(MagicMock()) +- producer = SimpleProducer(client, async=False, sync_fail_on_error=False) ++ producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + +- producer = SimpleProducer(client, async=False, sync_fail_on_error=True) ++ producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + + def test_cleanup_is_not_called_on_stopped_producer(self): +- producer = Producer(MagicMock(), async=True) ++ producer = Producer(MagicMock(), async_send=True) + producer.stopped = True + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) + self.assertEqual(mocked_stop.call_count, 0) + + def test_cleanup_is_called_on_running_producer(self): +- producer = Producer(MagicMock(), async=True) ++ producer = Producer(MagicMock(), async_send=True) + producer.stopped = False + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) diff -Nru python-kafka-1.3.3/debian/patches/series python-kafka-1.3.3/debian/patches/series --- python-kafka-1.3.3/debian/patches/series 2017-11-17 06:11:12.000000000 -0600 +++ python-kafka-1.3.3/debian/patches/series 2018-07-05 05:18:26.000000000 -0500 @@ -3,3 +3,4 @@ do-not-test-lz4-compression.patch remove-multiple-privacy-breaches.patch remove-old-lz4-test.patch +py37-compat.patch