Az Dls Upload System Cannot Find the Path Specified
Azure Data Lake Storage Gen1 Sink Connector for Confluent Platform¶
Y'all can utilise the Kafka Connect Azure Information Lake Storage Gen1 Sink connector to export data from Apache Kafka® topics to Azure Information Lake Storage Gen1 files in either Avro or JSON formats. Depending on your environment, the Azure Data Lake Storage Gen1 Sink connector can export data past guaranteeing exactly-once delivery semantics to consumers of the Azure Information Lake Storage Gen1 files it produces.
The Azure Data Lake Storage Gen1 Sink connector periodically polls data from Kafka and, in plough, uploads it to Azure Data Lake Storage Gen1. A partitioner is used to split the data of every Kafka partition into chunks. Each clamper of data is represented as an Azure Data Lake Storage Gen1 file. The key name encodes the topic, the Kafka division, and the starting time commencement of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each information clamper is determined past the number of records written to Azure Data Lake Storage Gen1 and past schema compatibility.
Features¶
The Microsoft Azure Information Lake Storage Gen1 Sink connector includes variety of features:
- Exactly in one case delivery
- Dead Letter Queue
- Multiple tasks
- Pluggable data format with or without schema
- Schema evolution
- Pluggable partitioner
Exactly once delivery¶
Records that are exported using a deterministic partitioner are delivered with exactly-once semantics.
Multiple tasks¶
The Azure Information Lake Storage Gen1 Sink connector supports running 1 or more tasks. You can specify the number of tasks in the tasks.max
configuration parameter. Multiple tasks may improve performance when moving a big amount of data.
Pluggable information format with or without schema¶
Out of the box, the connector supports writing data to Azure Data Lake Storage Gen1 in Avro and JSON format. Besides records with schema, the connector supports exporting plain JSON and as byte array records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the Format
interface.
Important
You must utilize the AvroConverter, ProtobufConverter
, or JsonSchemaConverter
with ParquetFormat
for this connector. Attempting to employ the JsonConverter
(with or without schemas) results in a NullPointerException and a StackOverflowException.
Pluggable Information Format with or without Schema¶
Out of the box, the connector supports writing data to Azure Data Lake Storage Gen1 in Avro and JSON format. Besides records with schema, the connector supports exporting evidently JSON and as byte array records without schema in text files, one tape per-line. In general, the connector may have whatsoever format that provides an implementation of the Format
interface.
Important
You must use the AvroConverter, ProtobufConverter
, or JsonSchemaConverter
with ParquetFormat
for this connector. Attempting to utilise the JsonConverter
(with or without schemas) results in a NullPointerException and a StackOverflowException.
Schema development¶
When schemas are used, the connector supports schema evolution based on schema compatibility modes. The available modes are: NONE
, Backward
, Forrard
and FULL
and a option can be made by setting the property schema.compatibility
in the connector's configuration. When the connector observes a schema change, it decides whether to ringlet the file or project the record to the proper schema co-ordinate to the schema.compatibility
configuration in utilize.
Pluggable partitioner¶
The connector comes out of the box with partitioners that support default sectionalisation based on Apache Kafka® partitions, field sectionalisation, and fourth dimension-based partition in days or hours. Yous may implement your ain partitioners by extending the Partitioner
course. Additionally, you can customize time-based partitioning by extending the TimeBasedPartitioner
class.
Install the Azure Data Lake Storage Gen1 Sink Connector¶
Y'all can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
Note
You must install the connector on every machine where Connect will run.
-
An installation of the Confluent Hub Client. This is installed past default with Confluent Enterprise.
-
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:latest
Yous tin install a specific version past replacing
latest
with a version number equally shown in the following example:confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen1-storage:1.1.i
-
You must authorize the connector user (or principal) to accept write permissions on the uncommitted folder; else, the connector will throw an exception.
Caution
You tin't mix schema and schemaless records in storage using kafka-connect-storage-common. Attempting this causes a runtime exception. If you are using the self-managed version of this connector, this issue will be evident when you review the log files (only available for the cocky-managed connector).
Mapping records to Azure Information Lake Storage Gen1 Objects¶
The Azure Data Lake Storage Gen1 Sink connector consumes records from the specified topics, organizes them into different partitions, writes batches of records in each partition to an file, so uploads those files to the Azure Data Lake Storage Gen1 bucket. Information technology uses Azure Data Lake Storage Gen1 object paths that include the Kafka topic and partition, the computed partition, and the filename. The Azure Data Lake Storage Gen1 Sink connector offers several ways to customize this behavior, including:
- Controlling the names of the Azure Information Lake Storage Gen1 objects
- Determining how records are partitioned into Azure Data Lake Storage Gen1 objects
- The format used to serialize sets of records into Azure Data Lake Storage Gen1 objects
- When to upload Azure Data Lake Storage Gen1 objects
Azure Information Lake Storage Gen1 object names¶
The Azure Information Lake Storage Gen1 information model is a flat structure: each bucket stores objects, and the name of each Azure Data Lake Storage Gen1 object serves as the unique key. However, a logical hierarchy tin can exist inferred when the Azure Data Lake Storage Gen1 object names uses directory delimiters, such as /
. The Azure Information Lake Storage Gen1 Sink connector allows you lot to customize the names of the Azure Data Lake Storage Gen1 objects it uploads to the Azure Data Lake Storage Gen1 bucket.
In general, the names of the Azure Information Lake Storage Gen1 object uploaded by the Azure Information Lake Storage Gen1 Sink connector follow this format:
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
where:
-
<prefix>
is specified with the connector'stopics.dir
configuration holding, which defaults to the literal valuetopics
and helps create uniquely named Azure Data Lake Storage Gen1 objects that don't disharmonism with existing Azure Data Lake Storage Gen1 objects in the same saucepan. -
<topic>
corresponds to the name of the Kafka topic from which the records in this Azure Data Lake Storage Gen1 object were read. -
<encodedPartition>
is generated by the Azure Data Lake Storage Gen1 Sink Connector's partitioner (come across Sectionalization records into Azure Data Lake Storage Gen1 objects). -
<kafkaPartition>
is the Kafka partition number from which the records in this Azure Information Lake Storage Gen1 object were read. -
<startOffset>
is the Kafka first of the first record written to this Azure Data Lake Storage Gen1 object. *<format>
is the extension identifying the format in which the records are serialized in this Azure Data Lake Storage Gen1 object.
If desired, the /
and +
characters can be changed using the connector'due south directory.delim
and file.delim
configuration backdrop.
Division records into Azure Data Lake Storage Gen1 objects¶
The Azure Data Lake Storage Gen1 Sink connector's partitioner determines how records read from a Kafka topic are partitioned into Azure Data Lake Storage Gen1 objects. The partitioner determines the <encodedPartition>
portion of the Azure Data Lake Storage Gen1 object names (see Azure Data Lake Storage Gen1 object names).
The partitioner is specified in the connector configuration with the partitioner.class
configuration property. The Azure Data Lake Storage Gen1 Sink connector comes with the post-obit partitioners:
- Default Kafka Partitioner: The
io.confluent.connect.storage.partitioner.DefaultPartitioner
preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end upwardly in Azure Information Lake Storage Gen1 objects with names that include the Kafka topic and Kafka partitions. The<encodedPartition>
is always<topicName>/sectionalisation=<kafkaPartition>
, resulting in Azure Data Lake Storage Gen1 object names such as<prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. - Field Partitioner: The
io.confluent.connect.storage.partitioner.FieldPartitioner
determines the partition from the field within each each tape identified by the connector'due southsectionalization.field.name
configuration property, which has no default. This partitioner requiresSTRUCT
record type values. The<encodedPartition>
is e'er<topicName>/<fieldName>=<fieldValue>
, resulting in Azure Information Lake Storage Gen1 Sink connector object names of the class<prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>
. - Time Based Partitioner: The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the yr, month, day, hr, minutes, and/or seconds. This partitioner requires the post-obit connector configuration backdrop:- The
path.format
configuration belongings specifies the pattern used for the<encodedPartition>
portion of the Azure Data Lake Storage Gen1 object name. For example, whenpath.format='yr'=YYYY/'month'=MM/'day'=dd/'60 minutes'=HH
, Azure Data Lake Storage Gen1 object names will have the form<prefix>/<topic>/year=YYYY/calendar month=MM/twenty-four hours=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. - The
partition.elapsing.ms
configuration property defines the maximum granularity of the Azure Information Lake Storage Gen1 objects inside a unmarried encoded partition directory. For example, settingpartition.duration.ms=600000
(10 minutes) will consequence in each Azure Information Lake Storage Gen1 object in that directory having no more than x minutes of records. - The
locale
configuration property specifies the JDK's locale used for formatting dates and times. For case, useen-U.s.
for US English language,en-GB
for U.k. English, andfr-FR
for French (in France). These may vary past Coffee version; run into the available locales. - The
timezone
configuration belongings specifies the current timezone in which the dates and times volition be treated. Utilize standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such equally those within the "en_US" locale. - The
timestamp.extractor
configuration belongings determines how to obtain a timestamp from each record. Values tin includeWallclock
(the default) to apply the organisation time when the record is processed,Tape
to use the timestamp of the Kafka tape cogent when it was produced or stored by the broker,RecordField
to excerpt the timestamp from one of the fields in the record'due south value as specified by thetimestamp.field
configuration property.
- The
- Daily Partitioner: The
io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'twenty-four hour period'=dd
andpartitioning.duration.ms=86400000
(1 solar day, for i Azure Data Lake Storage Gen1 object in each daily directory). This partitioner ever results in Azure Information Lake Storage Gen1 object names of the grade<prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the post-obit connector configuration properties:- The
locale
configuration property specifies the JDK's locale used for formatting dates and times. For instance, useen-US
for US English,en-GB
for United kingdom of great britain and northern ireland English, andfr-FR
for French (in France). These may vary past Java version; come across the bachelor locales. - The
timezone
configuration belongings specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such every bitUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; meet the available timezones inside each locale, such as those inside the "en_US" locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each tape. Values can includeWallclock
(the default) to utilize the system time when the tape is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from i of the fields in the record's value as specified by thetimestamp.field
configuration holding.
- The
- Hourly Partitioner: The
io.confluent.connect.storage.partitioner.HourlyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd/'hr'=HH
andsegmentation.duration.ms=3600000
(one hour, for ane Azure Information Lake Storage Gen1 object in each hourly directory). This partitioner always results in Azure Data Lake Storage Gen1 object names of the form<prefix>/<topic>/year=YYYY/calendar month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the post-obit connector configuration properties:- The
locale
configuration property specifies the JDK's locale used for formatting dates and times. For example, useen-U.s.
for United states of america English,en-GB
for United kingdom English,fr-FR
for French (in France). These may vary by Java version; see the bachelor locales. - The
timezone
configuration holding specifies the current timezone in which the dates and times will exist treated. Utilise standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; run across the available timezones within each locale, such as those inside the "en_US" locale. - The
timestamp.extractor
configuration belongings determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Tape
to utilise the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record'due south value equally specified by thetimestamp.field
configuration belongings.
- The
Every bit noted below, the selection of timestamp.extractor
affects whether the Azure Data Lake Storage Gen1 Sink connector can back up exactly once delivery.
Y'all can also choose to use a custom partitioner by implementing the io.confluent.connect.storage.partitioner.Partitioner
interface, packaging your implementation into a JAR file, and then:
- Identify the JAR file into the
share/coffee/kafka-connect-azure-data-lake-gen1-storage
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
- Configure Azure Information Lake Storage Gen1 Sink connectors to use your fully-qualified partitioner class proper noun.
Azure Data Lake Storage Gen1 object formats¶
The Azure Data Lake Storage Gen1 Sink connector can serialize multiple records into each Azure Data Lake Storage Gen1 object using a number of formats. The connector's format.form
configuration property identifies the name of the Java form that implements the io.confluent.connect.storage.format.Format
interface, and the Azure Data Lake Storage Gen1 Sink connector comes with several implementations:
- Avro: Apply
format.class=io.confluent.connect.azure.storage.format.avro.AvroFormat
to write the Azure Information Lake Storage Gen1 object as an Avro container file and will include the Avro schema in the container file followed by one or more than records. The connector'southavro.codec
configuration property specifies the Avro compression lawmaking, and values tin can benull
(the default) for no Avro compression,deflate
to employ the deflate algorithm every bit specified in RFC 1951,snappy
to utilize Google'southward Snappy compression library, andbzip2
for BZip2 compression. Optionally fixenhanced.avro.schema.back up=true
to enable enum symbol preservation and package name awareness. - JSON: Use
format.form=io.confluent.connect.azure.storage.format.json.JsonFormat
to write the Azure Information Lake Storage Gen1 object as a single JSON array containing a JSON object for each record. The connector'saz.compression.type
configuration belongings tin can be set tonone
(the default) for no compression orgzip
for GZip pinch. - Parquet: Employ
format.class=io.confluent.connect.azure.storage.format.parquet.ParquetFormat
to write the Azure Data Lake Storage Gen1 object equally a Parquet file columnar storage format. The connector'sparquet.codec
configuration holding specifies the Parquet compression code, and values tin besnappy
(the default) to utilise Google's Snappy pinch library,none
for no compression,gzip
to use GNU's GZip compression library,lzo
to use LZO (Lempel–Ziv–Oberhumer) compression library,brotli
to use Google's Brotli compression library,lz4
to use BSD licensed LZ4 compression library andzstd
to apply Facebook's ZStandard compression library. - Raw Bytes: Utilise
format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
to write the raw serialized record values delimited with the JDK's line separator to the Azure Information Lake Storage Gen1 object. This requires using thevalue.converter=org.apache.kafka.connect.converters.ByteArrayConverter
with the connector. Use a different delimiter past specifying the connectorformat.bytearray.separator
configuration property.
Y'all can too choose to utilise a custom partitioner past implementing the io.confluent.connect.storage.format.Format
interface, packaging your implementation into a JAR file, then:
- Identify the JAR file into the
share/java/kafka-connect-azure-data-lake-gen1-storage
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
- Configure Azure Data Lake Storage Gen1 Sink connectors with
format.class
set to the fully-qualified class name of your format implementation.
Azure Data Lake Storage Gen1 object uploads¶
As the Azure Data Lake Storage Gen1 Sink connector processes each record, information technology uses the partitioner to determine which encoded partitioning to write the record. This continues for each segmentation until the connector determines that a sectionalisation has enough records and should be flushed and uploaded to the Azure Information Lake Storage Gen1 bucket using the Azure Data Lake Storage Gen1 object name for that partition. This technique of knowing when to affluent a partition file and upload it to Azure Data Lake Storage is called the rotation strategy, and in that location are a number of means to control this behavior.
-
Maximum number of records: The connector's
flush.size
configuration property specifies the maximum number of records that should be written to a single Azure Data Lake Storage Gen1 object. There is no default for this setting.Important
Rotation strategy logic: In the following rotation strategies, the logic to flush files to storage is triggered when a new record arrives, after the divers interval or scheduled interval time. Flushing files is also triggered periodically by the
offset.affluent.interval.ms
setting defined in the Connect worker configuration. Thecommencement.flush.interval.ms
setting defaults to 60000 ms (lx seconds). If you enable the propertiesrotate.interval.ms
orrotate.schedule.interval.ms
and ingestion rate is low, you should set upoffset.flush.interval.ms
to a smaller value so that records flush at the rotation interval (or shut to the interval) . Leaving theoffset.affluent.interval.ms
set to the default threescore seconds may crusade records to stay in an open file for longer than expected, if no new records get processed that trigger rotation. -
Maximum span of tape time: In this rotation strategy, the connector's
rotate.interval.ms
property specifies the maximum timespan in milliseconds a file tin can remain open and ready for additional records. The timestamp for each file starts with the record timestamp of the first tape written to the file, as adamant by the partitioner'stimestamp.extractor
. As long as the side by side record's timestamp fits within the timespan specified by therotate.interval.ms
property, the record is written to the file. If a record's timestamp does not fit within the timespan ofrotate.interval.ms
, the connector flushes the file, uploads it to Azure Data Lake Storage Gen1, and commits the offsets of the records in that file. After this, the connector creates a new file with a timespan that starts with the first tape, and writes the first record to the file.
-
Scheduled rotation: In this rotation strategy, the connector's
rotate.schedule.interval.ms
specifies the maximum timespan in milliseconds a file tin can remain open and set for additional records. Unlikerotate.interval.ms
, with scheduled rotation the timestamp for each file starts with the system time that the first tape is written to the file. You must take the partitioner parametertimezone
configured (defaults to an empty string) when using this configuration belongings, otherwise the connector fails with an exception.Equally long as a record is candy within the timespan specified by
rotate.schedule.interval.ms
, the record will be written to the file. Equally soon as a new record is processed later the timespan for the electric current file, the file is flushed, uploaded to Azure Data Lake Storage Gen1, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the new record is written to the file. This configuration is useful when you have to commit your information based on current server time, for example at the beginning of every hour. The default value-1
ways that this feature is disabled.Scheduled rotation uses
rotate.schedule.interval.ms
to close the file and upload to Azure Information Lake Storage Gen1 on a regular basis using the electric current time, rather than the tape fourth dimension. Even if the connector has no more records to procedure, Connect volition still call the connector at least everystart.flush.interval.ms
, as defined in the Connect worker's configuration file. And every time this occurs, the connector uses the current time to make up one's mind if the currently opened file should exist airtight and uploaded to Azure Data Lake Storage Gen1.
These strategies can be combined equally needed. Even so, when using either of the two rotation strategies described above, the connector only closes and uploads a file to Azure Data Lake Storage Gen1 when the adjacent file does not vest based upon the timestamp. In other words, if the connector has no more records to procedure, the connector may go on the file open for a significant period of fourth dimension, until the connector can process another record.
Note
Not all rotation strategies are compatible with the Azure Data Lake Storage Gen1 Sink connector's power to deliver Azure Data Lake Storage Gen1 objects exactly once with eventual consistency. See the Exactly-once delivery department for details.
Quick Start¶
In this quick outset, the Azure Data Lake Storage Gen1 Sink connector is used to export data produced by the Avro panel producer to Azure Information Lake Storage Gen1.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-azure-information-lake-gen1-storage:latest
Tip
Past default, information technology will install the plugin into share/confluent-hub-components
and add the directory to the plugin path. If this is the first connector yous have installed, you may demand to restart the Connect server for the plugin path change to take outcome. Too encounter Azure Data Lake Gen1 CLI for setting up and using the Azure CLI.
Commencement the services using the Confluent CLI.
Tip
The control syntax for the Confluent CLI evolution commands changed in five.iii.0. These commands have been moved to confluent local
. For example, the syntax for confluent starting time
is now confluent local services start
. For more data, run into confluent local.
confluent local services start
Every service offset in club, printing a message with its status.
Starting Zookeeper Zookeeper is [Upwards] Starting Kafka Kafka is [Upward] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka Residual is [Upwards] Starting Connect Connect is [Up] Starting KSQL Server KSQL Server is [UP] Starting Control Middle Control Center is [Up]
Note
Make sure the Azure Data Lake Storage Gen1 Sink connector has write admission to the Azure Information Lake Storage Gen1 account shown in azure.datalake.business relationship.name
and tin can deploy credentials successfully.
To import a few records with a simple schema in Kafka, showtime the Avro console producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic datalake_topic \ --belongings value.schema= '{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
And then, in the console producer, enter the following:
{ "f1": "value1" } { "f1": "value2" } { "f1": "value3" } { "f1": "value4" } { "f1": "value5" } { "f1": "value6" } { "f1": "value7" } { "f1": "value8" } { "f1": "value9" }
The nine records entered are published to the Kafka topic datalake_topic
in Avro format.
Create a datalake.properties
file with the following contents:
name = datalake-sink connector.class = io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector tasks.max = 1 topics = datalake_topic flush.size = 3 azure.datalake.client.id = <your client id> azure.datalake.client.key = <your client fundamental> azure.datalake.account.name = <your business relationship proper noun> azure.datalake.token.endpoint = <your azure oauth2 token endpoint> format.class = io.confluent.connect.azure.storage.format.avro.AvroFormat confluent.topic.bootstrap.servers = localhost:9092 confluent.topic.replication.factor = 1
Before starting the connector, make certain that the configurations in datalake.backdrop
are properly set to your configurations of Azure Data Lake Storage Gen1. For this example, brand sure that azure.datalake.business relationship.proper noun
points to your Data Lake shop, azure.datalake.customer.id
is gear up to your user id, and azure.datalake.customer.cardinal
is gear up to your user's secret fundamental. The user ID or client ID should accept permission to write to the Azure Data Lake Storage Gen1 Account. Finally, set azure.datalake.token.endpoint
to the Oauth ii endpoint as described here, and use the v1 token endpoint. Then starting time the Azure Data Lake Storage Gen1 Sink connector by loading its configuration with the following command.
Caution
You lot must include a double dash ( --
) betwixt the topic name and your flag. For more data, see this post.
confluent local services connect connector load datalake-sink --config datalake.properties { "name": "datalake-sink", "config": { "name":"datalake-sink", "connector.class":"io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector", "tasks.max":"1", "topics":"datalake_topic", "affluent.size":"3", "azure.datalake.client.id":"<your client id>", "azure.datalake.client.key":"<your client key>", "azure.datalake.account.proper name":"<your account name>", "azure.datalake.token.endpoint":"<your azure oauth2 token endpoint>", "format.class":"io.confluent.connect.azure.storage.format.avro.AvroFormat", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.cistron":"1" }, "tasks": [] }
Check that the connector started successfully. Review the Connect worker's log by inbound the following:
confluent local services connect log
Towards the stop of the log you should meet that the connector starts, logs a few messages, and then uploads data from Kafka to Azure Data Lake Storage Gen1.
Once the connector has ingested some records, bank check that the data is available in Azure Data Lake Storage Gen1. Utilise the following Azure CLI command:
az dls fs list --business relationship <your account name> --path /topics
In one case you lot navigate into the subfolders, you should see three objects with keys.
topics/datalake_topic/partition= 0/datalake_topic+0+0000000000.avro topics/datalake_topic/partition= 0/datalake_topic+0+0000000003.avro topics/datalake_topic/sectionalization= 0/datalake_topic+0+0000000006.avro
Each file is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>
.
To verify the contents, copy each file from Azure Data Lake Storage Gen1 to your local filesystem. Apply the following Azure CLI command changing the destination to what makes sense for you:
az dls fs download --business relationship <your business relationship name> --source-path /topics/datalake_topic/partition= 0/datalake_topic+0+0000000000.avro --destination-path "C:\connect\datalake_topic+0+0000000000.avro"
Use avro-tools-ane.9.0.jar
(bachelor in Apache mirrors) to print the records.
java -jar avro-tools-i.8.two.jar tojson datalake_topic+0+0000000000.avro
For the file above, you should see the following output:
{ "f1":"value1" } { "f1":"value2" } { "f1":"value3" }
The rest of the records are contained in the other 2 files.
Finally, finish the Connect worker and all other Confluent services by running:
confluent local services stop
Your output should resemble:
Stopping Control Center Command Center is [Downwards] Stopping KSQL Server KSQL Server is [DOWN] Stopping Connect Connect is [DOWN] Stopping Kafka Remainder Kafka REST is [Down] Stopping Schema Registry Schema Registry is [Downwardly] Stopping Kafka Kafka is [DOWN] Stopping Zookeeper Zookeeper is [DOWN]
You can finish all services and remove any data generated during this quick start by entering the following control:
Your output should resemble:
Stopping Control Center Control Center is [DOWN] Stopping KSQL Server KSQL Server is [Downwardly] Stopping Connect Connect is [DOWN] Stopping Kafka Residuum Kafka REST is [Downwardly] Stopping Schema Registry Schema Registry is [Down] Stopping Kafka Kafka is [Down] Stopping Zookeeper Zookeeper is [DOWN] Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Exactly-one time delivery¶
The Azure Information Lake Storage Gen1 Sink connector is able to provide exactly-once semantics to consumers of the objects it exports to Azure Information Lake Storage Gen1, if the connector is supplied with a deterministic partitioner.
Currently, out of the bachelor partitioners, the default and field partitioners are always deterministic. TimeBasedPartitioner
tin exist deterministic with some configurations as discussed below. This implies that when any of these partitioners is used, file splitting always happens at the same offsets for a given fix of Kafka records. These partitioners accept into account flush.size
and schema.compatibility
to decide when to coil and save a new file to Azure Data Lake Storage Gen1. The connector always delivers files in Azure Data Lake Storage Gen1 that contain the same records, even nether the presence of failures. If a connector task fails before an upload completes, the file volition be still in the temp/
folder . If, on the other hand, a failure occurs after the upload has completed, merely before the respective offset is committed to Kafka by the connector, then a re-upload will take identify. However, this type of re-upload is transparent to the user of the Azure Data Lake Storage Gen1 folder, who at any time will have admission to the same records made eventually bachelor by successful uploads to Azure Information Lake Storage Gen1.
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the connector must be configured to use a deterministic implementation of TimestampExtractor
and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records ( timestamp.extractor=Tape
) or record fields ( timestamp.extractor=RecordField
). The deterministic rotation strategy configuration is rotate.interval.ms
(setting rotate.schedule.interval.ms
is nondeterministic and will invalidate exactly-once guarantees).
Schema Evolution¶
The Azure Data Lake Storage Gen1 Sink connector supports schema development and reacts to schema changes of data according to the schema.compatibility
configuration. This section explains how the connector reacts to schema evolution under different values of schema.compatibility
. The schema.compatibility
can be set to NONE
, BACKWARD
, FORWARD
and Total
, which means NO compatibility, Astern compatibility, FORWARD compatibility and FULL compatibility respectively.
-
NO Compatibility: By default, the
schema.compatibility
is set toNONE
. In this case, the connector ensures that each file written to Azure Data Lake Storage Gen1 has the proper schema. When the connector observes a schema alter in data, it commits the current ready of files for the affected topic partitions and writes the data with new schema in new files. -
BACKWARD Compatibility: If a schema is evolved in a backward compatible way, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is a astern compatible change to a schema, since when the connector encounters records written with the old schema that contain these fields, the connector can merely ignore them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps rails of the latest schema used in writing information to Azure Information Lake Storage Gen1. If a information tape with a schema version larger than electric current latest schema arrives, the connector commits the current prepare of files and writes the data tape with new schema to new files. For data records arriving at a subsequently time that use an earlier schema, the connector projects the information record to the latest schema before writing to the same fix of files in Azure Information Lake Storage Gen1. -
FORWARD Compatibility: If a schema is evolved in a frontward compatible way, the connector can always use the oldest schema to query all the information uniformly. Removing a field that had a default value is frontward uniform, since the one-time schema will use the default value when the field is missing.
If
Frontward
is specified in theschema.compatibility
, the connector projects the data to the oldest schema before writing to the same set of files in Azure Data Lake Storage Gen1. -
Full Compatibility: Full compatibility means that old data tin exist read with the new schema and new information tin also exist read with the old schema.
If
Total
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
Schema evolution in the Azure Data Lake Storage Gen1 Sink connector works the aforementioned way as Schema Evolution.
Write JSON message values into Azure Data Lake Storage Gen1¶
The instance settings file is shown below:
name =datalake-sink connector.course=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector tasks.max= one topics =datalake_topic flush.size= 100 # Required configuration azure.datalake.client.id=<your customer id> azure.datalake.customer.key=<your customer key> # The post-obit ascertain the data used to validate the license stored in Kafka confluent.license= confluent.topic.bootstrap.servers=localhost:9092
The first few settings are mutual to well-nigh connectors. topics
specifies the topics to export data from, in this case datalake_topic
. The property flush.size
specifies the number of records per partition the connector needs to write to before completing a multiblock upload to Azure Data Lake Storage Gen1.
The azure.datalake.client.id
and azure.datalake.client.key
are your required Azure credentials. This is a licensed Confluent connector. Enter the post-obit for testing purposes. For more than information, see Azure Information Lake Storage Gen1 Licensing.
azure.datalake.business relationship.name=<your business relationship name> azure.datalake.token.endpoint=<your azure oauth2 token endpoint>
The next settings are specific to Azure Data Lake Storage Gen1. A mandatory setting is the name of your Azure Data Lake Gen1 store/business relationship azure.datalake.business relationship.proper name
to host the exported Kafka records. Another mandatory configuration setting is azure.datalake.token.endpoint
. The connector authenticates access to your information lake using this URL.
format.course=io.confluent.connect.azure.storage.format.json.JsonFormat partitioner.course=io.confluent.connect.storage.partitioner.DefaultPartitioner
These course settings are required to specify the output file format, which is currently io.confluent.connect.azure.storage.format.avro.AvroFormat
, io.confluent.connect.azure.storage.format.json.JsonFormat
or io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat
, and the partitioner class
schema.compatibility=NONE
Finally, schema evolution is disabled in this example by setting schema.compatibility
to NONE
, as explained above.
For detailed descriptions for all the available configuration options of the Azure Data Lake Storage Gen1 Sink connector go to Azure Information Lake Storage Gen1 Sink Connector Configuration Backdrop.
Write raw message values into Azure Data Lake Storage Gen1¶
It is possible to employ the Azure Data Lake Storage Gen1 Sink connector to write out the unmodified original bulletin values into newline-separated files in Azure Data Lake Storage Gen1. To accomplish this configure Kafka Connect so it does non deserialize whatever of the messages, and configure the Azure Data Lake Storage Gen1 Sink connector to store the message values in a binary format in Azure Data Lake Storage Gen1.
The first part of the Azure Information Lake Storage Gen1 Sink connector configuration is similar to other examples.
name =datalake-raw-sink connector.class=io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector tasks.max= one topics =datalake_topic flush.size= 3
The topics
setting specifies the topics you want to export data from, which is datalake_topic
in the case. The property affluent.size
specifies the number of records per division the connector needs to write before completing an upload to Azure Data Lake Storage Gen1.
Next, configure container proper name, block size, and compression type.
azure.datalake.account.proper noun=myconfluentdatalake azure.datalake.token.endpoint=https://login.microsoftonline.com/a7d99622-a589-4520-8ce3-c280ed1cb00c/oauth2/token azure.datalake.customer.id=21aaeb79-1956-486a-bc36-baa1f710d567 azure.datalake.client.central=HGw@4@DSkjBRslXA4vuR:-lxQ4H3+PTs az.compression.type=gzip
The next settings are specific to Azure Data Lake Storage Gen1. A mandatory setting is the account name of your Gen1 Azure Data Lake, azure.datalake.account.name
which will host the exported Kafka records. Another mandatory configuration setting is azure.datalake.token.endpoint
. The connector authenticates admission to your data lake using this URL. The azure.datalake.client.id
and azure.datalake.client.cardinal
are your required Azure client credentials.
The az.pinch.type
specifies that the Azure Data Lake Storage Gen1 Sink connector should shrink all Azure Data Lake Storage Gen1 files using GZIP compression, calculation the .gz
extension to any files (come across below).
This example configuration is typical of most Azure Data Lake Storage Gen1 Sink connectors. Now, configure the connector to read the raw message values and write them in binary format:
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter format.class=io.confluent.connect.azure.storage.format.bytearray.ByteArrayFormat schema.compatibility=NONE
The value.converter
setting overrides the connector default in the Connect worker configuration. ByteArrayConverter
is used to instruct Connect to skip deserializing the bulletin values and provide the message values in their raw binary course. The format.class
setting is used to instruct the Azure Data Lake Storage Gen1 Sink connector to write these binary message values as-is into Azure Data Lake Storage Gen1 files. By default the messages written to the same Azure Data Lake Storage Gen1 file are separated by a newline graphic symbol sequence, but y'all tin can control this with the format.bytearray.separator
setting. You may want to consider setting this if your messages might contain newlines. Too, by default the files written to Azure Data Lake Storage Gen1 take an extension of .bin
(earlier compression, if enabled), or you can use the format.bytearray.extension
setting to alter the pre-compression filename extension.
Adjacent, you need to decide how you desire to partitioning the consumed messages in Azure Data Lake Storage Gen1 files. Yous have a few options, including the default partitioner that preserves the same partitions as in Kafka:
partitioner.form=io.confluent.connect.storage.partitioner.DefaultPartitioner
Or, y'all could segmentation using the timestamp of the Kafka messages.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Record
Or, you tin use the timestamp that the Azure Data Lake Storage Gen1 Sink connector processes each message.
partitioner.grade=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Wallclock
Custom partitioners are ever an option, as well. Just be aware that since the record value is an opaque binary value, Connect cannot extract timestamps from fields using the RecordField
option.
The Azure Data Lake Storage Gen1 Sink connector configuration outlined higher up results in newline-delimited gzipped objects in Azure Data Lake Storage Gen1 with .bin.gz
.
Source: https://docs.confluent.io/kafka-connect-azure-data-lake-gen1-sink/current/overview.html
0 Response to "Az Dls Upload System Cannot Find the Path Specified"
Post a Comment