Kafka-onderwerp opschonen

Is er een manier om het onderwerp in kafka te verwijderen?

Ik heb een bericht gepusht dat te groot was in een Kafka-berichtonderwerp op mijn lokale computer, nu krijg ik een foutmelding:

kafka.common.InvalidMessageSizeException: invalid message size

Het vergroten van de fetch.sizeis hier niet ideaal, omdat ik zulke grote berichten eigenlijk niet wil accepteren.


Antwoord 1, autoriteit 100%

Werk de bewaartijd voor het onderwerp tijdelijk bij naar één seconde:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

En in nieuwere Kafka-releases kun je het ook doen met kafka-configs --entity-type topics

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

wacht vervolgens tot de zuivering van kracht is (duur hangt af van de grootte van het onderwerp). Eenmaal verwijderd, herstelt u de vorige retention.mswaarde.


Antwoord 2, autoriteit 21%

Als u de wachtrij wilt opschonen, kunt u het onderwerp verwijderen:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

maak het vervolgens opnieuw:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

Antwoord 3, autoriteit 12%

Hoewel het geaccepteerde antwoord juist is, is die methode afgeschaft. Onderwerpconfiguratie moet nu worden gedaan via kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

Configuraties die via deze methode zijn ingesteld, kunnen worden weergegeven met het commando

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

Antwoord 4, autoriteit 12%

Dit zijn de stappen die ik volg om een ​​onderwerp met de naam MyTopicte verwijderen:

  1. Beschrijf het onderwerp en neem niet de makelaar-ID’s
  2. Stop de Apache Kafka-daemon voor elke vermelde broker-ID.
  3. Maak verbinding met elke makelaar en verwijder de map met onderwerpgegevens, b.v. rm -rf /tmp/kafka-logs/MyTopic-0. Herhaal dit voor andere partities en alle replica’s
  4. Verwijder de metadata van het onderwerp: zkCli.shen vervolgens rmr /brokers/MyTopic
  5. Start de Apache Kafka-daemon voor elke gestopte machine

Als je stap 3 hebt gemist, blijft Apache Kafka het onderwerp als aanwezig rapporteren (bijvoorbeeld als je kafka-list-topic.shuitvoert).

Getest met Apache Kafka 0.8.0.


Antwoord 5, autoriteit 10%

Getest in Kafka 0.8.2, voor het snelstartvoorbeeld:
Voeg eerst een regel toe aan het bestand server.properties onder de configuratiemap:

delete.topic.enable=true

dan kun je deze opdracht uitvoeren:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Maak het vervolgens opnieuw, zodat klanten hun activiteiten kunnen voortzetten tegen een leeg onderwerp


Antwoord 6, autoriteit 3%

Van kafka 1.1

Een onderwerp wissen

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

wacht minstens 1 minuut om er zeker van te zijn dat Kafka het onderwerp opschoont
verwijder de configuratie en ga dan naar de standaardwaarde

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms

Antwoord 7, autoriteit 2%

Het volgende commando kan worden gebruikt om alle bestaande berichten in het kafka-onderwerp te verwijderen:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

De structuur van het delete.json-bestand zou als volgt moeten zijn:

{
“partities”: [
{
“topic”: “foo”,
“partitie”: 1,
“offset”: -1
}
],
“versie 1
}

Indien offset: -1 zal alle records verwijderen
(Deze opdracht is getest met KAFKA 2.0.1


Antwoord 8, Autoriteit 2%

Kafka heeft geen directe methode voor PURGE / CLEAN-UP-onderwerp (wachtrijen), maar kan dit doen via het verwijderen van dat onderwerp en het opnieuw maakt.

Eerste van het bestand Sever.Properties heeft en indien niet delete.topic.enable=true

Verwijder vervolgens het onderwerp
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

Maak het vervolgens opnieuw.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

Antwoord 9, Autoriteit 2%

Volgend @Steven Appleyard Antwoord heb ik de volgende opdrachten op Kafka 2.2.0 uitgevoerd en ze werkten voor mij.

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

Antwoord 10

Update: Dit antwoord is relevant voor KAFKA 0.6. Voor Kafka 0.8 en zie later antwoord door @Patrick.

Ja, stop KAFKA en verwijder handmatig alle bestanden van de bijbehorende subdirectory (het is eenvoudig om het te vinden in Kafka Data Directory). Nadat Kafka opnieuw is opgestart, is het onderwerp leeg.


Antwoord 11

Veel grote antwoorden hier, maar onder hen vond ik er geen een over docker. Ik heb wat tijd doorgebracht om erachter te komen dat het gebruik van de Broker Container verkeerd is voor deze zaak (uiteraard !!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

en ik had moeten gebruiken zookeeper:2181in plaats van --zookeeper localhost:2181volgens mijn bestandsbestand

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

De juiste opdracht zou

zijn

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

Ik hoop dat het iemands tijd zal redden.

Houd er ook rekening mee dat de berichten niet onmiddellijk worden verwijderd en het zal gebeuren wanneer het segment van het logboek wordt gesloten.


Antwoord 12

Soms, als u een verzadigd cluster hebt (te veel partities of het gebruik van gecodeerde onderwerpgegevens of het gebruik van SSL, of de controller op een slecht knooppunt, of de verbinding flauw is, duurt het een lange tijd Purge zei het onderwerp.

Ik volg deze stappen, vooral als u AVRO gebruikt.

1: Uitvoeren met KAFKA-tools:

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Uitvoeren:

kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: Zet onderwerpbehoud terug naar de oorspronkelijke instelling, zodra het onderwerp leeg is.

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

Ik hoop dat dit iemand helpt, aangezien er niet gemakkelijk reclame voor wordt gemaakt.


Antwoord 13

De eenvoudigste manier is om de datum van de afzonderlijke logbestanden in te stellen op een oudere datum dan de bewaarperiode. Dan moet de makelaar ze binnen enkele seconden opruimen en voor u verwijderen. Dit biedt verschillende voordelen:

  1. Het is niet nodig om makelaars neer te halen, het is een runtime-operatie.
  2. Vermijdt de mogelijkheid van ongeldige offset-uitzonderingen (meer daarover hieronder).

In mijn ervaring met Kafka 0.7.x kan het verwijderen van de logbestanden en het herstarten van de broker leiden tot ongeldige offset-uitzonderingen voor bepaalde consumenten. Dit zou gebeuren omdat de makelaar de offsets opnieuw op nul start (bij afwezigheid van bestaande logbestanden), en een consument die eerder van het onderwerp consumeerde, opnieuw verbinding zou maken om een ​​specifieke [eenmaal geldige] offset aan te vragen. Als deze compensatie toevallig buiten de grenzen van de nieuwe onderwerplogs valt, kan dat geen kwaad en gaat de consument verder aan het begin of het einde. Maar als de offset binnen de grenzen van de nieuwe onderwerplogboeken valt, probeert de broker de berichtenset op te halen, maar dit mislukt omdat de offset niet overeenkomt met een daadwerkelijk bericht.

Dit kan worden verzacht door ook de consumentencompensaties in de dierentuinhouder voor dat onderwerp te wissen. Maar als je geen nieuw onderwerp nodig hebt en alleen de bestaande inhoud wilt verwijderen, dan is het simpelweg ‘aanraken’ van een paar onderwerplogboeken veel gemakkelijker en betrouwbaarder dan het stoppen van makelaars, het verwijderen van onderwerplogboeken en het wissen van bepaalde dierenverzorgersknooppunten .


Antwoord 14

Thomas’ advies is geweldig, maar helaas lijkt zkCliin oude versies van Zookeeper (bijvoorbeeld 3.3.6) rmrniet te ondersteunen. Vergelijk bijvoorbeeld de implementatie van de opdrachtregel in moderne Zookeepermet versie 3.3.

Als je te maken hebt met een oude versie van Zookeeper, is een oplossing het gebruik van een clientbibliotheek zoals zc.zkvoor Python. Voor mensen die niet bekend zijn met Python, je moet het installeren met behulp van pipof easy_install. Start vervolgens een Python-shell (python) en je kunt het volgende doen:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

of zelfs

zk.delete_recursive('brokers')

als je alle onderwerpen uit Kafka wilt verwijderen.


Antwoord 15

Naast het updaten van retentie.ms en retentie.bytes, merkte ik dat het beleid voor het opschonen van onderwerpen “delete” (standaard) moet zijn, als het “compact” is, houdt het berichten langer vast, dwz als het “compact” is , moet u ook delete.retention.msopgeven.

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

moest ook de vroegste / nieuwste offsets volgen, moet hetzelfde zijn om te bevestigen dat dit met succes is gebeurd, kan ook de DU -H / TMP / KAFKA-logs / test-topic-3-100 – *

controleren

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

Het andere probleem is, u moet huidige configuratie eerst krijgen, zodat u weet dat u terugkeert nadat het verwijderen is geslaagd:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics


Antwoord 16

Om alle berichten van een bepaald onderwerp op te ruimen met behulp van uw applicatiegroep (GroupName moet hetzelfde zijn als Application Kafka Group Name).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group


Antwoord 17

NIEUW, EERLIJKE handleiding, benadering voor het zuiveren van een onderwerp is:

in de makelaars:

  1. STOP KAFKA BROKER

    sudo service kafka stop
  2. Verwijder alle partitieblogbestanden (moet op alle makelaars worden uitgevoerd)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

in Zookeeper:

  1. Run Zookeper-opdrachtregelinterface

    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. Gebruik ZKCLI om het onderwerp metadata
    te verwijderen
    rmr /brokers/topic/<some_topic_name>

in de makelaars opnieuw:

  1. Start Broker Service

    sudo service kafka start

Antwoord 18

./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

Dit zou retention.msmoeten geven. Dan kunt u boven de opdracht wijzigen om te wijzigen naar 1Second (en later terug naar standaard terug te keren).

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000

Antwoord 19

Vanuit Java, met behulp van de nieuwe AdminZkClientin plaats van de verouderde AdminUtils:

 public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }
  private void deleteTopic(String topic, KafkaZkClient zkClient) {
    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }
    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);
    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }
    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }
  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();
    return topics;
  }

Antwoord 20

Als u dit programmatisch wilt doen binnen een Java-toepassing, kunt u de AdminClient’s API deleteRecordsgebruiken. Met behulp van de AdminClient kunt u records op partitie- en offsetniveau verwijderen.

Volgens de JavaDocsdeze bewerking wordt ondersteund door brokers met versie 0.11.0.0 of hoger.

Hier is een eenvoudig voorbeeld:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}

Antwoord 21

De tijdelijke oplossing voor het tijdelijk verminderen van de bewaartijd voor een onderwerp, voorgesteld door user644265in deze answerwerkt nog steeds, maar recente versies van kafka-configszullen waarschuwen dat de maar de --zookeeperoptie is verouderd:

Waarschuwing: –zookeeper is verouderd en wordt verwijderd in een toekomstige versie van Kafka

Gebruik in plaats daarvan --bootstrap-server; bijvoorbeeld

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

en

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms

Antwoord 22

# you have to enable this on config
sudo echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 
# purge the topic
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
# create the topic
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
# list the topic
# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning

Antwoord 23

hier is de opdracht om het onderwerp te verwijderen als u confluentinc/cp-kafkacontainers gebruikt.

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

Geslaagde reactie:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Antwoord 24

Heeft u overwogen om uw app gewoon een nieuw hernoemd onderwerp te laten gebruiken? (d.w.z. een onderwerp dat dezelfde naam heeft als het oorspronkelijke onderwerp, maar met een “1” aan het einde).

Dat zou je app ook een nieuw, schoon onderwerp geven.

Other episodes