-
Notifications
You must be signed in to change notification settings - Fork 227
Apache Flume Plugin
The plugin enables us to reliably and efficiently stream large amounts of data/logs onto HBase using the Phoenix API. The necessary configuration of the custom Phoenix sink and the Event Serializer has to be configured in the Flume configuration file for the Agent. Currently, the only supported Event serializer is a RegexEventSerializer which primarily breaks the Flume Event body based on the regex specified in the configuration file.
1) Phoenix v 3.0.0 SNAPSHOT +
2) Flume 1.4.0 +
1) Download Phoenix v 3.0.0 SNAPSHOT from [repo](https://github.com/forcedotcom/phoenix)
2) Follow the instructions as specified at https://github.com/forcedotcom/phoenix/wiki/Building-Project to build the project as the Flume plugin is still under beta
3) Create a directory plugins.d within $FLUME_HOME directory. Within that, create a sub-directories phoenix-sink/lib
4) Copy the generated phoenix-3.0.0-SNAPSHOT.jar onto $FLUME_HOME/plugins.d/phoenix-sink/lib
Property Name | Default | Description |
---|---|---|
type | com.salesforce.phoenix.flume.sink.PhoenixSink | |
batchSize | 100 | Default number of events per transaction |
zookeeperQuorum | Zookeeper quorum of the HBase cluster | |
table | The name of the table in HBase to write to. | |
ddl | The CREATE TABLE query for the HBase table where the events will be upserted to. If specified, the query will be executed. Recommended to include the IF NOT EXISTS clause in the ddl. | |
serializer | regex | Event serializers for processing the Flume Event . Currently , only regex is supported. |
serializer.regex | (.*) | The regular expression for parsing the event. |
serializer.columns | The columns that will be extracted from the Flume event for inserting into HBase. | |
serializer.headers | Headers of the Flume Events that go as part of the UPSERT query. The data type for these columns are VARCHAR by default. | |
serializer.rowkeyType | A custom row key generator . Can be one of timestamp,date,uuid,random and nanotimestamp. This should be configured in cases where we need a custom row key value to be auto generated and set for the primary key column. |
Example configuration for ingesting Apache access logs onto Phoenix. Here we are using UUID as a row key generator for the primary key.
//configuration for agent
agent.sources = spooling-source
agent.sinks = phoenix-sink
agent.channels = memoryChannel
//configuration for channel
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.transactionCapacity=100
agent.channels.memoryChannel.byteCapacityBufferPercentage=20
//configuration for source
agent.sources.spooling-source.type=spooldir
agent.sources.spooling-source.channels = memoryChannel
agent.sources.spooling-source.spoolDir = /opt/logs
//configuration for interceptor
agent.sources.spooling-source.interceptors = i1
agent.sources.spooling-source.interceptors.i1.type = host
agent.sources.spooling-source.interceptors.i1.hostHeader = f_host
//configuration for sink
agent.sinks.phoenix-sink.type=com.salesforce.phoenix.flume.sink.PhoenixSink
agent.sinks.phoenix-sink.channel = memoryChannel
agent.sinks.phoenix-sink.batchSize = 100
agent.sinks.phoenix-sink.table = APACHE_LOGS
agent.sinks.phoenix-sink.ddl = CREATE TABLE IF NOT EXISTS APACHE_LOGS(uid VARCHAR NOT NULL, host VARCHAR ,identity VARCHAR ,user VARCHAR ,time VARCHAR ,method VARCHAR ,request VARCHAR,protocol VARCHAR,status INTEGER ,size INTEGER ,referer VARCHAR ,agent VARCHAR,f_host VARCHAR CONSTRAINT pk PRIMARY KEY (uid))
agent.sinks.phoenix-sink.zookeeperQuorum = localhost
agent.sinks.phoenix-sink.serializer = REGEX
agent.sinks.phoenix-sink.serializer.rowkeyType = uuid
agent.sinks.phoenix-sink.serializer.regex = <span>([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+) ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?</span>
agent.sinks.phoenix-sink.serializer.columns = host,identity,user,time,method,request,protocol,status,size,referer,agent
agent.sinks.phoenix-sink.serializer.headers = f_host
$ bin/flume-ng agent -f conf/flume-conf.properties -c ./conf -n agent
For monitoring the agent and the sink process , enable JMX via flume-env.sh($FLUME_HOME/conf/flume-env.sh) script. Ensure you have the following line uncommented.
JAVA_OPTS="-Xms1g -Xmx1g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=3141 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"